From 22e9a84152a082586a20fa10f5d22b4b074478dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Bj=C3=B6rklund?= Date: Tue, 9 Dec 2025 08:53:57 +0000 Subject: [PATCH 1/7] Added source optimization --- init/config2args.py | 3 ++ init/schema.json | 3 ++ src/plugins/storage/cache/src/cache.cpp | 38 +++++++++++++++++++++---- src/plugins/storage/cache/src/cache.hpp | 18 ++++++++++++ 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/init/config2args.py b/init/config2args.py index 9789b0c2d..d169da70d 100755 --- a/init/config2args.py +++ b/init/config2args.py @@ -333,6 +333,9 @@ def process_storage(config): cache_params.append(f"s={cache['size_exponent']}") if "line_size_exponent" in cache: cache_params.append(f"l={cache['line_size_exponent']}") + if "source_optimization" in cache: + so_value = "true" if cache['source_optimization'] else "false" + cache_params.append(f"so={so_value}") if cache_params: params.append(f"{';'.join(cache_params)}") diff --git a/init/schema.json b/init/schema.json index 3d2f9e9f7..9662c6ea7 100644 --- a/init/schema.json +++ b/init/schema.json @@ -406,6 +406,9 @@ "line_size_exponent": { "type": "integer", "minimum": 1 + }, + "source_optimization": { + "type": "boolean" } }, "additionalProperties": false diff --git a/src/plugins/storage/cache/src/cache.cpp b/src/plugins/storage/cache/src/cache.cpp index bda8bb4ac..83295362e 100644 --- a/src/plugins/storage/cache/src/cache.cpp +++ b/src/plugins/storage/cache/src/cache.cpp @@ -163,6 +163,7 @@ NHTFlowCache::NHTFlowCache(const std::string& params, ipx_ring_t* queue) , m_inactive(0) , m_split_biflow(false) , m_enable_fragmentation_cache(true) + , m_source_optimization_enabled(false) , m_keylen(0) , m_key() , m_key_inv() @@ -220,7 +221,7 @@ void NHTFlowCache::init(const char* params) m_split_biflow = parser.m_split_biflow; m_enable_fragmentation_cache = parser.m_enable_fragmentation_cache; - + m_source_optimization_enabled = parser.m_source_optimization_enabled; if (m_enable_fragmentation_cache) { try { m_fragmentation_cache @@ -367,7 +368,15 @@ int NHTFlowCache::put_pkt(Packet& pkt) } } } - + // Set all source/destination ports to 0 for source optimization based on flow direction + // this will consolidate more flows into single flow record + if (m_source_optimization_enabled) { + if( source_flow ) { + pkt.src_port = 0; + } else { + pkt.dst_port = 0; + } + } if (found) { /* Existing flow record was found, put flow record at the first index of flow line. */ #ifdef FLOW_CACHE_STATS @@ -436,6 +445,7 @@ int NHTFlowCache::put_pkt(Packet& pkt) if (flow->is_empty()) { m_flows_in_cache++; flow->create(pkt, hashval); + ret = plugins_post_create(flow->m_flow, pkt); if (ret & FLOW_FLUSH) { @@ -526,7 +536,11 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v4->proto = pkt.ip_proto; key_v4->ip_version = IP::v4; - key_v4->src_port = pkt.src_port; + if (m_source_optimization_enabled) { + key_v4->src_port = 0; + } else { + key_v4->src_port = pkt.src_port; + } key_v4->dst_port = pkt.dst_port; key_v4->src_ip = pkt.src_ip.v4; key_v4->dst_ip = pkt.dst_ip.v4; @@ -534,7 +548,11 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v4_inv->proto = pkt.ip_proto; key_v4_inv->ip_version = IP::v4; - key_v4_inv->src_port = pkt.dst_port; + if (m_source_optimization_enabled) { + key_v4_inv->src_port = 0; + } else { + key_v4_inv->src_port = pkt.dst_port; + } key_v4_inv->dst_port = pkt.src_port; key_v4_inv->src_ip = pkt.dst_ip.v4; key_v4_inv->dst_ip = pkt.src_ip.v4; @@ -548,7 +566,11 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v6->proto = pkt.ip_proto; key_v6->ip_version = IP::v6; - key_v6->src_port = pkt.src_port; + if (m_source_optimization_enabled) { + key_v6->src_port = 0; + } else { + key_v6->src_port = pkt.src_port; + } key_v6->dst_port = pkt.dst_port; memcpy(key_v6->src_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); memcpy(key_v6->dst_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); @@ -556,7 +578,11 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v6_inv->proto = pkt.ip_proto; key_v6_inv->ip_version = IP::v6; - key_v6_inv->src_port = pkt.dst_port; + if (m_source_optimization_enabled) { + key_v6_inv->src_port = 0; + } else { + key_v6_inv->src_port = pkt.dst_port; + } key_v6_inv->dst_port = pkt.src_port; memcpy(key_v6_inv->src_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); memcpy(key_v6_inv->dst_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); diff --git a/src/plugins/storage/cache/src/cache.hpp b/src/plugins/storage/cache/src/cache.hpp index 704d6e402..ec97a7dcb 100644 --- a/src/plugins/storage/cache/src/cache.hpp +++ b/src/plugins/storage/cache/src/cache.hpp @@ -86,6 +86,7 @@ class CacheOptParser : public OptionsParser { uint32_t m_inactive; bool m_split_biflow; bool m_enable_fragmentation_cache; + bool m_source_optimization_enabled; std::size_t m_frag_cache_size; time_t m_frag_cache_timeout; @@ -97,6 +98,7 @@ class CacheOptParser : public OptionsParser { , m_inactive(DEFAULT_INACTIVE_TIMEOUT) , m_split_biflow(false) , m_enable_fragmentation_cache(true) + , m_source_optimization_enabled(false) , m_frag_cache_size(10007) , // Prime for better distribution in hash table m_frag_cache_timeout(3) @@ -217,6 +219,21 @@ class CacheOptParser : public OptionsParser { } return true; }); + register_option( + "so", + "source_optimization", + "true|false", + "Enable/disable source optimization e.g sets all source ports to 0. Disabled (false) by default.", + [this](const char* arg) { + if (strcmp(arg, "true") == 0) { + m_source_optimization_enabled = true; + } else if (strcmp(arg, "false") == 0) { + m_source_optimization_enabled = false; + } else { + return false; + } + return true; + }); } }; @@ -298,6 +315,7 @@ class NHTFlowCache uint32_t m_inactive; bool m_split_biflow; bool m_enable_fragmentation_cache; + bool m_source_optimization_enabled; uint8_t m_keylen; char m_key[MAX_KEY_LENGTH]; char m_key_inv[MAX_KEY_LENGTH]; From c506fc69ab86c8961dbeb0e5f83023993a9f8347 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Bj=C3=B6rklund?= <9413089+JimmyBjorklund@users.noreply.github.com> Date: Wed, 10 Dec 2025 09:18:48 +0100 Subject: [PATCH 2/7] This QinQ extension handels both the inner an outer vlan tags This QinQ extension handels both the inner an outer vlan tags and exports them as: ``` DOT1Q_VLAN_ID DOT1Q_CUSTOMER_VLAN_ID ``` --- http-test2.pcap | Bin 0 -> 1342 bytes include/ipfixprobe/ipfix-elements.hpp | 7 ++ include/ipfixprobe/packet.hpp | 2 + qinq.pcap | Bin 0 -> 1244 bytes src/plugins/input/parser/parser.cpp | 11 +- src/plugins/process/CMakeLists.txt | 1 + src/plugins/process/qinq/CMakeLists.txt | 27 +++++ src/plugins/process/qinq/README.md | 0 src/plugins/process/qinq/src/qinq.cpp | 56 ++++++++++ src/plugins/process/qinq/src/qinq.hpp | 104 ++++++++++++++++++ src/plugins/process/vlan/src/vlan.hpp | 6 +- src/plugins/storage/cache/src/cache.cpp | 4 + src/plugins/storage/cache/src/cache.hpp | 2 + .../fragmentationKeyData.hpp | 2 + 14 files changed, 217 insertions(+), 5 deletions(-) create mode 100644 http-test2.pcap create mode 100644 qinq.pcap create mode 100644 src/plugins/process/qinq/CMakeLists.txt create mode 100644 src/plugins/process/qinq/README.md create mode 100644 src/plugins/process/qinq/src/qinq.cpp create mode 100644 src/plugins/process/qinq/src/qinq.hpp diff --git a/http-test2.pcap b/http-test2.pcap new file mode 100644 index 0000000000000000000000000000000000000000..c206fb72d179d0a0e7a31a307ab2aa14de10ee3e GIT binary patch literal 1342 zcmbW1O=#0#7{}jsU02@`*hJWFzVxU~lXmMmv$i_jm+q6bQxPv&@~(}vNll`-gD?l8 zOz<+qc@W$|Q1Kwe%Pun1i{cyuMTUrbIgJtth`n(838ueEtceci;gQWkk2@#Z-h5k1p;=taq)> zEo~6pjJRGV35z1>@8)ouY*UH!8hU_;&7_Sguo8}ta7YrPQiDi3ds7@vQu91DCDNLg zl?>%@k8Ig|uc2xq8dW87f~o-$imbmKBH{rl+$@P9=k4~CoR)|`O8JP|5FVzAI1-JH zsQ&c^@`JLTrj|s8M{R0x_=us~&}+Vj>S;S8kx;ma!^dTF6aFHd&G&A;OTI3<-IIyXE$3FdJ# z<6Qpy-^5xVc5Ejmzi$%DtcXzKR8z^^##Srdr~`8+#%f)wUyp8B(F<0bWEu;95z9_8 W5O@AJEGMAq`cl>L4UIRUYVRK)(Wheo literal 0 HcmV?d00001 diff --git a/include/ipfixprobe/ipfix-elements.hpp b/include/ipfixprobe/ipfix-elements.hpp index 76c15adf3..0a8322867 100644 --- a/include/ipfixprobe/ipfix-elements.hpp +++ b/include/ipfixprobe/ipfix-elements.hpp @@ -93,6 +93,8 @@ namespace ipxp { #define ETHERTYPE(F) F(0, 256, 2, nullptr) #define VLAN_ID(F) F(0, 58, 2, nullptr) +#define DOT1Q_VLAN_ID(F) F(0, 243, 2, nullptr) +#define DOT1Q_CUSTOMER_VLAN_ID(F) F(0, 245, 2, nullptr) #define L2_SRC_MAC(F) F(0, 56, 6, flow.src_mac) #define L2_DST_MAC(F) F(0, 80, 6, flow.dst_mac) @@ -564,6 +566,10 @@ namespace ipxp { #define IPFIX_VLAN_TEMPLATE(F) F(VLAN_ID) +#define IPFIX_QINQ_TEMPLATE(F) \ + F(DOT1Q_VLAN_ID) \ + F(DOT1Q_CUSTOMER_VLAN_ID) + #define IPFIX_NETTISA_TEMPLATE(F) \ F(NTS_MEAN) \ F(NTS_MIN) \ @@ -615,6 +621,7 @@ namespace ipxp { IPFIX_SSADETECTOR_TEMPLATE(F) \ IPFIX_ICMP_TEMPLATE(F) \ IPFIX_VLAN_TEMPLATE(F) \ + IPFIX_QINQ_TEMPLATE(F) \ IPFIX_NETTISA_TEMPLATE(F) \ IPFIX_FLOW_HASH_TEMPLATE(F) diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index c84baa534..9e69d4b9d 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -60,6 +60,7 @@ struct Packet : public Record { ipaddr_t src_ip; ipaddr_t dst_ip; uint32_t vlan_id; + uint32_t vlan_id2; uint32_t frag_id; uint16_t frag_off; bool more_fragments; @@ -119,6 +120,7 @@ struct Packet : public Record { , src_ip({0}) , dst_ip({0}) , vlan_id(0) + , vlan_id2(0) , frag_id(0) , frag_off(0) , more_fragments(false) diff --git a/qinq.pcap b/qinq.pcap new file mode 100644 index 0000000000000000000000000000000000000000..708c99bd4d0b027d74a088aee07282d616bd9f7f GIT binary patch literal 1244 zcmb8vJ4ixN7zgm9Tum!Tds*4T(%zO<)`cdQ8iGV3t3j8BA}50+Y_g%HDYb`MOVD7r zq-77VrN^iuNC`oMQ$b_VW9s`PI8$)$g>%n+@IOD!e>l0@_@S6^gkBsG!HfT!Uy3C- z;)G`$Nu84On&575JJtyi!{B6OjF9QnvzeGhb2p{%1%^aMBp(4kIoX00aMhGwjuTNR zE32w&YE^ah4eG|G<`zwBn^xD}(b?7A)2r|6Hy8&7hfKpGd;oHGm4rb~hl{e5%631< zG2fr;eitThL(a3zB4>;!&$_4_;z5pMYq(e50)OCQ-SZCQUehdcR)ez5OXWfy<^0mASC+0oxK`y$h!K{CEjN+xdl}JWAy|2gn!L8p;xFIZn)N zhau;QFv-U#PkvH4_yh7KwuYi%Lzuh|xg$6goRG!w|4H(cMR_7aW!C}7SJ)cLGX-Jt EFa6r^>Hq)$ literal 0 HcmV?d00001 diff --git a/src/plugins/input/parser/parser.cpp b/src/plugins/input/parser/parser.cpp index d2a731cea..9564b22ff 100644 --- a/src/plugins/input/parser/parser.cpp +++ b/src/plugins/input/parser/parser.cpp @@ -113,7 +113,7 @@ inline uint16_t parse_eth_hdr(const u_char* data_ptr, uint16_t data_len, Packet* // set the default value in case there is no VLAN ID pkt->vlan_id = 0; - + pkt->vlan_id2 = 0; if (ethertype == ETH_P_8021AD || ethertype == ETH_P_8021Q) { if (4 > data_len - hdr_len) { throw "Parser detected malformed packet"; @@ -137,7 +137,8 @@ inline uint16_t parse_eth_hdr(const u_char* data_ptr, uint16_t data_len, Packet* if (4 > data_len - hdr_len) { throw "Parser detected malformed packet"; } - DEBUG_CODE(uint16_t vlan = ntohs(*(uint16_t*) (data_ptr + hdr_len))); + uint16_t vlan = ntohs(*(uint16_t*) (data_ptr + hdr_len)); + pkt->vlan_id2 = vlan & 0x0FFF; DEBUG_MSG("\t802.1q field:\n"); DEBUG_MSG("\t\tPriority:\t%u\n", ((vlan & 0xE000) >> 12)); DEBUG_MSG("\t\tCFI:\t\t%u\n", ((vlan & 0x1000) >> 11)); @@ -771,6 +772,9 @@ void parse_packet( if (pkt->vlan_id) { stats.vlan_packets++; } + if( pkt->vlan_id2) { + stats.vlan_packets++; + } if (pkt->ethertype == ETH_P_IP) { stats.ipv4_packets++; @@ -802,6 +806,9 @@ void parse_packet( pkt->payload = pkt->packet + data_offset; stats.vlan_stats[pkt->vlan_id].update(*pkt); + if( pkt->vlan_id2) { + stats.vlan_stats[pkt->vlan_id2].update(*pkt); + } DEBUG_MSG("Payload length:\t%u\n", pkt->payload_len); DEBUG_MSG("Packet parser exits: packet parsed\n"); diff --git a/src/plugins/process/CMakeLists.txt b/src/plugins/process/CMakeLists.txt index a47322075..54197e5c5 100644 --- a/src/plugins/process/CMakeLists.txt +++ b/src/plugins/process/CMakeLists.txt @@ -21,6 +21,7 @@ add_subdirectory(smtp) add_subdirectory(quic) add_subdirectory(tls) add_subdirectory(http) +add_subdirectory(qinq) if (ENABLE_PROCESS_EXPERIMENTAL) add_subdirectory(sip) diff --git a/src/plugins/process/qinq/CMakeLists.txt b/src/plugins/process/qinq/CMakeLists.txt new file mode 100644 index 000000000..50dd2f6e0 --- /dev/null +++ b/src/plugins/process/qinq/CMakeLists.txt @@ -0,0 +1,27 @@ +project(ipfixprobe-process-qinq VERSION 1.0.0 DESCRIPTION "ipfixprobe-process-qinq plugin") + +add_library(ipfixprobe-process-qinq MODULE + src/qinq.cpp + src/qinq.hpp +) + +set_target_properties(ipfixprobe-process-qinq PROPERTIES + CXX_VISIBILITY_PRESET hidden + VISIBILITY_INLINES_HIDDEN YES +) + +target_include_directories(ipfixprobe-process-qinq PRIVATE + ${CMAKE_SOURCE_DIR}/include/ +) + +if(ENABLE_NEMEA) + target_link_libraries(ipfixprobe-process-qinq PRIVATE + -Wl,--whole-archive ipfixprobe-nemea-fields -Wl,--no-whole-archive + unirec::unirec + trap::trap + ) +endif() + +install(TARGETS ipfixprobe-process-qinq + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixprobe/process/" +) diff --git a/src/plugins/process/qinq/README.md b/src/plugins/process/qinq/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/src/plugins/process/qinq/src/qinq.cpp b/src/plugins/process/qinq/src/qinq.cpp new file mode 100644 index 000000000..4d8370f76 --- /dev/null +++ b/src/plugins/process/qinq/src/qinq.cpp @@ -0,0 +1,56 @@ +/** + * @file + * @brief Plugin for parsing basicplus traffic. + * @author Jakub Antonín Štigler xstigl00@xstigl00@stud.fit.vut.cz + * @author Pavel Siska + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "qinq.hpp" + +#include + +#include +#include + +namespace ipxp { + +static const PluginManifest qinqPluginManifest = { + .name = "qinq", + .description = "QinQ process plugin for parsing QinQ traffic, outputs outer and inner VLAN IDs.", + .pluginVersion = "1.0.0", + .apiVersion = "1.0.0", + .usage = + []() { + OptionsParser parser("qinq", "Parse qinq traffic"); + parser.usage(std::cout); + }, +}; + +QinQPlugin::QinQPlugin(const std::string& params, int pluginID) + : ProcessPlugin(pluginID) +{ + init(params.c_str()); +} + +ProcessPlugin* QinQPlugin::copy() +{ + return new QinQPlugin(*this); +} + +int QinQPlugin::post_create(Flow& rec, const Packet& pkt) +{ + auto ext = new RecordExtQinQ(m_pluginID); + ext->vlan_id = pkt.vlan_id; + ext->vlan_id2 = pkt.vlan_id2; + rec.add_extension(ext); + return 0; +} + +static const PluginRegistrar qinqRegistrar(qinqPluginManifest); + +} // namespace ipxp diff --git a/src/plugins/process/qinq/src/qinq.hpp b/src/plugins/process/qinq/src/qinq.hpp new file mode 100644 index 000000000..ae59fa731 --- /dev/null +++ b/src/plugins/process/qinq/src/qinq.hpp @@ -0,0 +1,104 @@ +/** + * @file + * @brief Plugin for parsing basicplus traffic. + * @author Jakub Antonín Štigler xstigl00@xstigl00@stud.fit.vut.cz + * @author Pavel Siska + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include + +#ifdef WITH_NEMEA +#include "fields.h" +#endif + +#include +#include +#include + +#include +#include +#include +#include + +namespace ipxp { + +#define QINQ_UNIREC_TEMPLATE "DOT1Q_VLAN_ID,DOT1Q_CUSTOMER_VLAN_ID" + +UR_FIELDS( + uint16 VLAN_ID; + uint16 VLAN_ID2; +) + + +/** + * \brief Flow record extension header for storing parsed QinQ data. + */ +struct RecordExtQinQ : public RecordExt { + // vlan id is in the host byte order + uint16_t vlan_id; + uint16_t vlan_id2; + RecordExtQinQ(int pluginID) + : RecordExt(pluginID) + , vlan_id(0) + , vlan_id2(0) + { + } + +#ifdef WITH_NEMEA + virtual void fill_unirec(ur_template_t* tmplt, void* record) + { + ur_set(tmplt, record, F_VLAN_ID, vlan_id); + ur_set(tmplt, record, F_VLAN_ID2, vlan_id2); + } + + const char* get_unirec_tmplt() const { return QINQ_UNIREC_TEMPLATE; } +#endif + + virtual int fill_ipfix(uint8_t* buffer, int size) + { + const int LEN = sizeof(vlan_id); + const int LEN2 = sizeof(vlan_id2); + if( size < (LEN + LEN2) ) { + return LEN; + } + *reinterpret_cast(buffer) = htons(vlan_id); + *reinterpret_cast(buffer + LEN) = htons(vlan_id2); + return (LEN + LEN2); + } + + const char** get_ipfix_tmplt() const + { + static const char* ipfix_qinq_template[] = {IPFIX_QINQ_TEMPLATE(IPFIX_FIELD_NAMES) NULL}; + return ipfix_qinq_template; + } + + std::string get_text() const + { + std::ostringstream out; + out << "DOT1Q_VLAN_ID=\"" << vlan_id << "\", DOT1Q_CUSTOMER_VLAN_ID=\"" << vlan_id2 << "\""; + return out.str(); + } +}; + +/** + * \brief Process plugin for parsing VLAN packets. + */ +class QinQPlugin : public ProcessPlugin { +public: + QinQPlugin(const std::string& params, int pluginID); + OptionsParser* get_parser() const { return new OptionsParser("qinq", "Parse QinQ traffic"); } + std::string get_name() const { return "qinq"; } + RecordExt* get_ext() const { return new RecordExtQinQ(m_pluginID); } + ProcessPlugin* copy(); + + int post_create(Flow& rec, const Packet& pkt); +}; + +} // namespace ipxp diff --git a/src/plugins/process/vlan/src/vlan.hpp b/src/plugins/process/vlan/src/vlan.hpp index f12abfd14..adf651039 100644 --- a/src/plugins/process/vlan/src/vlan.hpp +++ b/src/plugins/process/vlan/src/vlan.hpp @@ -39,7 +39,7 @@ UR_FIELDS(uint16 VLAN_ID) struct RecordExtVLAN : public RecordExt { // vlan id is in the host byte order uint16_t vlan_id; - + uint16_t vlan_id2; RecordExtVLAN(int pluginID) : RecordExt(pluginID) , vlan_id(0) @@ -69,8 +69,8 @@ struct RecordExtVLAN : public RecordExt { const char** get_ipfix_tmplt() const { - static const char* ipfix_template[] = {IPFIX_VLAN_TEMPLATE(IPFIX_FIELD_NAMES) NULL}; - return ipfix_template; + static const char* ipfix_vlan_template[] = {IPFIX_VLAN_TEMPLATE(IPFIX_FIELD_NAMES) NULL}; + return ipfix_vlan_template; } std::string get_text() const diff --git a/src/plugins/storage/cache/src/cache.cpp b/src/plugins/storage/cache/src/cache.cpp index 83295362e..04893f9a6 100644 --- a/src/plugins/storage/cache/src/cache.cpp +++ b/src/plugins/storage/cache/src/cache.cpp @@ -545,6 +545,7 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v4->src_ip = pkt.src_ip.v4; key_v4->dst_ip = pkt.dst_ip.v4; key_v4->vlan_id = pkt.vlan_id; + key_v4->vlan_id2 = pkt.vlan_id2; key_v4_inv->proto = pkt.ip_proto; key_v4_inv->ip_version = IP::v4; @@ -557,6 +558,7 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v4_inv->src_ip = pkt.dst_ip.v4; key_v4_inv->dst_ip = pkt.src_ip.v4; key_v4_inv->vlan_id = pkt.vlan_id; + key_v4_inv->vlan_id2 = pkt.vlan_id2; m_keylen = sizeof(flow_key_v4_t); return true; @@ -575,6 +577,7 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) memcpy(key_v6->src_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); memcpy(key_v6->dst_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); key_v6->vlan_id = pkt.vlan_id; + key_v6->vlan_id2 = pkt.vlan_id2; key_v6_inv->proto = pkt.ip_proto; key_v6_inv->ip_version = IP::v6; @@ -587,6 +590,7 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) memcpy(key_v6_inv->src_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); memcpy(key_v6_inv->dst_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); key_v6_inv->vlan_id = pkt.vlan_id; + key_v6_inv->vlan_id2 = pkt.vlan_id2; m_keylen = sizeof(flow_key_v6_t); return true; diff --git a/src/plugins/storage/cache/src/cache.hpp b/src/plugins/storage/cache/src/cache.hpp index ec97a7dcb..09f1b6462 100644 --- a/src/plugins/storage/cache/src/cache.hpp +++ b/src/plugins/storage/cache/src/cache.hpp @@ -34,6 +34,7 @@ struct __attribute__((packed)) flow_key_v4_t { uint32_t src_ip; uint32_t dst_ip; uint16_t vlan_id; + uint16_t vlan_id2; }; struct __attribute__((packed)) flow_key_v6_t { @@ -44,6 +45,7 @@ struct __attribute__((packed)) flow_key_v6_t { uint8_t src_ip[16]; uint8_t dst_ip[16]; uint16_t vlan_id; + uint16_t vlan_id2; }; #define MAX_KEY_LENGTH (max(sizeof(flow_key_v4_t), sizeof(flow_key_v6_t))) diff --git a/src/plugins/storage/cache/src/fragmentationCache/fragmentationKeyData.hpp b/src/plugins/storage/cache/src/fragmentationCache/fragmentationKeyData.hpp index 726764026..f0302e312 100644 --- a/src/plugins/storage/cache/src/fragmentationCache/fragmentationKeyData.hpp +++ b/src/plugins/storage/cache/src/fragmentationCache/fragmentationKeyData.hpp @@ -58,6 +58,7 @@ struct FragmentationKey { , destination_ip(packet.dst_ip) , fragmentation_id(packet.frag_id) , vlan_id(packet.vlan_id) + , vlan_id2(packet.vlan_id2) { } @@ -76,6 +77,7 @@ struct FragmentationKey { ipaddr_t destination_ip; ///< Destination IP address of the packet. uint32_t fragmentation_id; ///< Fragmentation ID of the packet. uint16_t vlan_id; ///< VLAN ID of the packet. + uint16_t vlan_id2; ///< Second VLAN ID of the packet. } __attribute__((packed)); /** From 912d60a7c12bb7aeedbcff17dac0ea650ced048e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Bj=C3=B6rklund?= Date: Mon, 22 Dec 2025 08:07:12 +0000 Subject: [PATCH 3/7] Removed pcap files --- http-test2.pcap | Bin 1342 -> 0 bytes qinq.pcap | Bin 1244 -> 0 bytes 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 http-test2.pcap delete mode 100644 qinq.pcap diff --git a/http-test2.pcap b/http-test2.pcap deleted file mode 100644 index c206fb72d179d0a0e7a31a307ab2aa14de10ee3e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1342 zcmbW1O=#0#7{}jsU02@`*hJWFzVxU~lXmMmv$i_jm+q6bQxPv&@~(}vNll`-gD?l8 zOz<+qc@W$|Q1Kwe%Pun1i{cyuMTUrbIgJtth`n(838ueEtceci;gQWkk2@#Z-h5k1p;=taq)> zEo~6pjJRGV35z1>@8)ouY*UH!8hU_;&7_Sguo8}ta7YrPQiDi3ds7@vQu91DCDNLg zl?>%@k8Ig|uc2xq8dW87f~o-$imbmKBH{rl+$@P9=k4~CoR)|`O8JP|5FVzAI1-JH zsQ&c^@`JLTrj|s8M{R0x_=us~&}+Vj>S;S8kx;ma!^dTF6aFHd&G&A;OTI3<-IIyXE$3FdJ# z<6Qpy-^5xVc5Ejmzi$%DtcXzKR8z^^##Srdr~`8+#%f)wUyp8B(F<0bWEu;95z9_8 W5O@AJEGMAq`cl>L4UIRUYVRK)(Wheo diff --git a/qinq.pcap b/qinq.pcap deleted file mode 100644 index 708c99bd4d0b027d74a088aee07282d616bd9f7f..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1244 zcmb8vJ4ixN7zgm9Tum!Tds*4T(%zO<)`cdQ8iGV3t3j8BA}50+Y_g%HDYb`MOVD7r zq-77VrN^iuNC`oMQ$b_VW9s`PI8$)$g>%n+@IOD!e>l0@_@S6^gkBsG!HfT!Uy3C- z;)G`$Nu84On&575JJtyi!{B6OjF9QnvzeGhb2p{%1%^aMBp(4kIoX00aMhGwjuTNR zE32w&YE^ah4eG|G<`zwBn^xD}(b?7A)2r|6Hy8&7hfKpGd;oHGm4rb~hl{e5%631< zG2fr;eitThL(a3zB4>;!&$_4_;z5pMYq(e50)OCQ-SZCQUehdcR)ez5OXWfy<^0mASC+0oxK`y$h!K{CEjN+xdl}JWAy|2gn!L8p;xFIZn)N zhau;QFv-U#PkvH4_yh7KwuYi%Lzuh|xg$6goRG!w|4H(cMR_7aW!C}7SJ)cLGX-Jt EFa6r^>Hq)$ From 224d736031e0f7764ec7765f2c8d9d5c732b307a Mon Sep 17 00:00:00 2001 From: Jimmy Bjorklund Date: Thu, 8 Jan 2026 11:03:20 +0100 Subject: [PATCH 4/7] Updated README --- README.md | 3 ++- src/plugins/process/qinq/README.md | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 234f7f968..0ac2cc5f4 100644 --- a/README.md +++ b/README.md @@ -173,7 +173,8 @@ These plugins extract protocol-specific or behavioral information from packets a | [`smtp`](./src/plugins/process/smtp/README.md) | extracts SMTP envelope data (from, to, subject, etc.) | | [`ssaDetector`](./src/plugins/process/ssaDetector/README.md) | performs simple anomaly detection based on traffic patterns | | [`ssdp`](./src/plugins/process/ssdp/README.md) | parses SSDP (UPnP discovery) protocol | -| [`vlan`](./src/plugins/process/vlan/README.md) | extracts VLAN IDs and QinQ encapsulation | +| [`vlan`](./src/plugins/process/vlan/README.md) | extracts VLAN IDs | +| [`qinq`](./src/plugins/process/qinq/README.md) | extracts QinQ outer and inner VLAN IDs. | --- ### Output Plugins diff --git a/src/plugins/process/qinq/README.md b/src/plugins/process/qinq/README.md index e69de29bb..3bd869587 100644 --- a/src/plugins/process/qinq/README.md +++ b/src/plugins/process/qinq/README.md @@ -0,0 +1 @@ +QinQ process plugin for parsing QinQ traffic, outputs outer and inner VLAN IDs. \ No newline at end of file From d6f0cc52a8d24f78b1558d073a8ccbf82e63a463 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Bj=C3=B6rklund?= Date: Wed, 28 Jan 2026 12:55:14 +0000 Subject: [PATCH 5/7] Fixed issue causing more records when source optimization is enabled, we want less records not more. --- src/plugins/storage/cache/src/cache.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/plugins/storage/cache/src/cache.cpp b/src/plugins/storage/cache/src/cache.cpp index 04893f9a6..d4b242d53 100644 --- a/src/plugins/storage/cache/src/cache.cpp +++ b/src/plugins/storage/cache/src/cache.cpp @@ -434,8 +434,11 @@ int NHTFlowCache::put_pkt(Packet& pkt) flow = m_flow_table[flow_index]; uint8_t flw_flags = source_flow ? flow->m_flow.src_tcp_flags : flow->m_flow.dst_tcp_flags; - if ((pkt.tcp_flags & 0x02) && (flw_flags & (0x01 | 0x04))) { - // Flows with FIN or RST TCP flags are exported when new SYN packet arrives + if (!m_source_optimization_enabled && (pkt.tcp_flags & 0x02) && (flw_flags & (0x01 | 0x04))) { + // Flows with FIN or RST TCP flags are exported when new SYN packet arrives. + // When source optimization is enabled this case do not make any sence as the code would + // trigger a record push even thow we are trying to collect all data to a destination with + // in actvie timeout. m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_EOF; export_flow(flow_index); put_pkt(pkt); From 17d135a0b761d9d8d8c376a520536e46443e51ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Bj=C3=B6rklund?= <9413089+JimmyBjorklund@users.noreply.github.com> Date: Fri, 13 Feb 2026 09:06:14 +0100 Subject: [PATCH 6/7] =?UTF-8?q?Changed=20code=20for=20source=20optimizatio?= =?UTF-8?q?n=20to=20get=20a=20inside=20network=20list=20to=E2=80=A6=20(#3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Changed code for source optimization to get a inside network list to be able to deterministically know what is inside and not. --- init/config2args.py | 31 +++ init/schema.json | 18 ++ src/plugins/storage/cache/CMakeLists.txt | 2 + src/plugins/storage/cache/src/cache.cpp | 67 +++-- src/plugins/storage/cache/src/cache.hpp | 23 +- .../storage/cache/src/sourceOptimization.cpp | 234 ++++++++++++++++++ .../storage/cache/src/sourceOptimization.hpp | 48 ++++ 7 files changed, 402 insertions(+), 21 deletions(-) create mode 100644 src/plugins/storage/cache/src/sourceOptimization.cpp create mode 100644 src/plugins/storage/cache/src/sourceOptimization.hpp diff --git a/init/config2args.py b/init/config2args.py index d169da70d..7d25508af 100755 --- a/init/config2args.py +++ b/init/config2args.py @@ -336,6 +336,37 @@ def process_storage(config): if "source_optimization" in cache: so_value = "true" if cache['source_optimization'] else "false" cache_params.append(f"so={so_value}") + if "source_optimization_network" in cache: + son_networks = cache.get("source_optimization_network") + if son_networks: + # Handle source_optimization_network - can be a list of dicts or a single dict + if isinstance(son_networks, dict): + # If it's a single dict with main/exclude, convert to list + son_networks = [son_networks] + elif not isinstance(son_networks, list): + son_networks = [son_networks] + + # Generate -son arguments for each network group + for network_group in son_networks: + if isinstance(network_group, dict): + main = network_group.get("main") + exclude = network_group.get("exclude", "") + + if main: + # Combine main network with exclusions + networks = [main.strip()] + if exclude: + # Handle exclude as comma-separated string + if isinstance(exclude, str): + excludes = [e.strip() for e in exclude.split(",")] + networks.extend(excludes) + elif isinstance(exclude, list): + networks.extend(exclude) + + cache_params.append(f"son={','.join(networks)}") + elif isinstance(network_group, str): + # If it's just a string, use it directly + cache_params.append(f"son={network_group}") if cache_params: params.append(f"{';'.join(cache_params)}") diff --git a/init/schema.json b/init/schema.json index 9662c6ea7..95cd06b56 100644 --- a/init/schema.json +++ b/init/schema.json @@ -409,6 +409,24 @@ }, "source_optimization": { "type": "boolean" + }, + "source_optimization_network": { + "type": "array", + "items": { + "type": "object", + "properties": { + "main": { + "type": "string" + }, + "exclude": { + "type": "string" + } + }, + "required": [ + "main" + ], + "additionalProperties": false + } } }, "additionalProperties": false diff --git a/src/plugins/storage/cache/CMakeLists.txt b/src/plugins/storage/cache/CMakeLists.txt index 2ee3d36c1..f9633dca4 100644 --- a/src/plugins/storage/cache/CMakeLists.txt +++ b/src/plugins/storage/cache/CMakeLists.txt @@ -12,6 +12,8 @@ add_library(ipfixprobe-storage-cache MODULE src/fragmentationCache/fragmentationTable.hpp src/fragmentationCache/ringBuffer.hpp src/fragmentationCache/timevalUtils.hpp + src/sourceOptimization.cpp + src/sourceOptimization.hpp src/xxhash.c src/xxhash.h ) diff --git a/src/plugins/storage/cache/src/cache.cpp b/src/plugins/storage/cache/src/cache.cpp index d4b242d53..e65eeb2aa 100644 --- a/src/plugins/storage/cache/src/cache.cpp +++ b/src/plugins/storage/cache/src/cache.cpp @@ -25,6 +25,7 @@ #include #include + namespace ipxp { static const PluginManifest cachePluginManifest = { @@ -163,7 +164,7 @@ NHTFlowCache::NHTFlowCache(const std::string& params, ipx_ring_t* queue) , m_inactive(0) , m_split_biflow(false) , m_enable_fragmentation_cache(true) - , m_source_optimization_enabled(false) + , m_source_optimization(NULL) , m_keylen(0) , m_key() , m_key_inv() @@ -178,6 +179,9 @@ NHTFlowCache::NHTFlowCache(const std::string& params, ipx_ring_t* queue) NHTFlowCache::~NHTFlowCache() { close(); + if( m_source_optimization ) { + delete m_source_optimization; + } } void NHTFlowCache::init(const char* params) @@ -221,7 +225,9 @@ void NHTFlowCache::init(const char* params) m_split_biflow = parser.m_split_biflow; m_enable_fragmentation_cache = parser.m_enable_fragmentation_cache; - m_source_optimization_enabled = parser.m_source_optimization_enabled; + if( parser.m_source_optimization_enabled ) { + m_source_optimization = new SourceOptimization( parser.m_source_optimization_networks ); + } if (m_enable_fragmentation_cache) { try { m_fragmentation_cache @@ -322,19 +328,30 @@ void NHTFlowCache::flush(Packet& pkt, size_t flow_index, int ret, bool source_fl int NHTFlowCache::put_pkt(Packet& pkt) { + source_optimization_mode_t mode = MODE_NONE; int ret = plugins_pre_create(pkt); if (m_enable_fragmentation_cache) { try_to_fill_ports_to_fragmented_packet(pkt); } - - if (!create_hash_key(pkt)) { // saves key value and key length into attributes NHTFlowCache::key + if( m_source_optimization ) { + mode = m_source_optimization->get_mode(pkt); + } + if (!create_hash_key(pkt, mode)) { // saves key value and key length into attributes NHTFlowCache::key // and NHTFlowCache::m_keylen return 0; } prefetch_export_expired(); + if( m_source_optimization ) { + if( mode == MODE_SRC ) { + pkt.src_port = 0; + } + else if( mode == MODE_DST ) { + pkt.dst_port = 0; + } + } uint64_t hashval = XXH64(m_key, m_keylen, 0); /* Calculates hash value from key created before. */ @@ -370,13 +387,13 @@ int NHTFlowCache::put_pkt(Packet& pkt) } // Set all source/destination ports to 0 for source optimization based on flow direction // this will consolidate more flows into single flow record - if (m_source_optimization_enabled) { + /*if (m_source_optimization_enabled) { if( source_flow ) { pkt.src_port = 0; } else { pkt.dst_port = 0; } - } + }*/ if (found) { /* Existing flow record was found, put flow record at the first index of flow line. */ #ifdef FLOW_CACHE_STATS @@ -434,7 +451,7 @@ int NHTFlowCache::put_pkt(Packet& pkt) flow = m_flow_table[flow_index]; uint8_t flw_flags = source_flow ? flow->m_flow.src_tcp_flags : flow->m_flow.dst_tcp_flags; - if (!m_source_optimization_enabled && (pkt.tcp_flags & 0x02) && (flw_flags & (0x01 | 0x04))) { + if (!m_source_optimization && (pkt.tcp_flags & 0x02) && (flw_flags & (0x01 | 0x04))) { // Flows with FIN or RST TCP flags are exported when new SYN packet arrives. // When source optimization is enabled this case do not make any sence as the code would // trigger a record push even thow we are trying to collect all data to a destination with @@ -531,7 +548,7 @@ void NHTFlowCache::export_expired(time_t ts) m_timeout_idx = (m_timeout_idx + m_line_new_idx) & (m_cache_size - 1); } -bool NHTFlowCache::create_hash_key(Packet& pkt) +bool NHTFlowCache::create_hash_key(Packet& pkt, int mode) { if (pkt.ip_version == IP::v4) { struct flow_key_v4_t* key_v4 = reinterpret_cast(m_key); @@ -539,12 +556,17 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v4->proto = pkt.ip_proto; key_v4->ip_version = IP::v4; - if (m_source_optimization_enabled) { + // We get called again by timeout and then we can not do this again. + if( m_source_optimization && mode == MODE_SRC) { key_v4->src_port = 0; + key_v4->dst_port = pkt.dst_port; + } else if( m_source_optimization && mode == MODE_DST ) { + key_v4->src_port = pkt.src_port; + key_v4->dst_port = 0; } else { key_v4->src_port = pkt.src_port; + key_v4->dst_port = pkt.dst_port; } - key_v4->dst_port = pkt.dst_port; key_v4->src_ip = pkt.src_ip.v4; key_v4->dst_ip = pkt.dst_ip.v4; key_v4->vlan_id = pkt.vlan_id; @@ -552,12 +574,16 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v4_inv->proto = pkt.ip_proto; key_v4_inv->ip_version = IP::v4; - if (m_source_optimization_enabled) { + if( m_source_optimization && mode == MODE_SRC ) { key_v4_inv->src_port = 0; + key_v4_inv->dst_port = pkt.src_port; + } else if( m_source_optimization && mode == MODE_DST) { + key_v4_inv->src_port = pkt.dst_port; + key_v4_inv->dst_port = 0; } else { key_v4_inv->src_port = pkt.dst_port; + key_v4_inv->dst_port = pkt.src_port; } - key_v4_inv->dst_port = pkt.src_port; key_v4_inv->src_ip = pkt.dst_ip.v4; key_v4_inv->dst_ip = pkt.src_ip.v4; key_v4_inv->vlan_id = pkt.vlan_id; @@ -571,12 +597,17 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v6->proto = pkt.ip_proto; key_v6->ip_version = IP::v6; - if (m_source_optimization_enabled) { + if( m_source_optimization && mode == MODE_SRC ) { key_v6->src_port = 0; + key_v6->dst_port = pkt.dst_port; + } else if( m_source_optimization && mode == MODE_DST ){ + key_v6->src_port = pkt.src_port; + key_v6->dst_port = 0; } else { key_v6->src_port = pkt.src_port; + key_v6->dst_port = pkt.dst_port; } - key_v6->dst_port = pkt.dst_port; + memcpy(key_v6->src_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); memcpy(key_v6->dst_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); key_v6->vlan_id = pkt.vlan_id; @@ -584,12 +615,16 @@ bool NHTFlowCache::create_hash_key(Packet& pkt) key_v6_inv->proto = pkt.ip_proto; key_v6_inv->ip_version = IP::v6; - if (m_source_optimization_enabled) { + if (m_source_optimization && mode == MODE_SRC) { key_v6_inv->src_port = 0; + key_v6_inv->dst_port = pkt.src_port; + } else if( m_source_optimization && mode == MODE_DST) { + key_v6_inv->src_port = pkt.dst_port; + key_v6_inv->dst_port = 0; } else { + key_v6_inv->dst_port = pkt.src_port; key_v6_inv->src_port = pkt.dst_port; } - key_v6_inv->dst_port = pkt.src_port; memcpy(key_v6_inv->src_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); memcpy(key_v6_inv->dst_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); key_v6_inv->vlan_id = pkt.vlan_id; diff --git a/src/plugins/storage/cache/src/cache.hpp b/src/plugins/storage/cache/src/cache.hpp index 09f1b6462..d585edf69 100644 --- a/src/plugins/storage/cache/src/cache.hpp +++ b/src/plugins/storage/cache/src/cache.hpp @@ -15,7 +15,7 @@ #pragma once #include "fragmentationCache/fragmentationCache.hpp" - +#include "sourceOptimization.hpp" #include #include @@ -90,8 +90,8 @@ class CacheOptParser : public OptionsParser { bool m_enable_fragmentation_cache; bool m_source_optimization_enabled; std::size_t m_frag_cache_size; - time_t m_frag_cache_timeout; - + time_t m_frag_cache_timeout; + std::vector m_source_optimization_networks; CacheOptParser() : OptionsParser("cache", "Storage plugin implemented as a hash table") , m_cache_size(1 << DEFAULT_FLOW_CACHE_SIZE) @@ -236,6 +236,19 @@ class CacheOptParser : public OptionsParser { } return true; }); + register_option( + "son", + "source_optimization_network", + "", + "network to include as inside and optional excluded subnets.", + [this](const char* arg) { + try { + m_source_optimization_networks.push_back(arg); + } catch (std::invalid_argument& e) { + return false; + } + return m_source_optimization_networks.size() > 0; + }); } }; @@ -317,7 +330,7 @@ class NHTFlowCache uint32_t m_inactive; bool m_split_biflow; bool m_enable_fragmentation_cache; - bool m_source_optimization_enabled; + SourceOptimization* m_source_optimization; uint8_t m_keylen; char m_key[MAX_KEY_LENGTH]; char m_key_inv[MAX_KEY_LENGTH]; @@ -330,7 +343,7 @@ class NHTFlowCache void try_to_fill_ports_to_fragmented_packet(Packet& packet); void flush(Packet& pkt, size_t flow_index, int ret, bool source_flow); - bool create_hash_key(Packet& pkt); + bool create_hash_key(Packet& pkt, int mode); void export_flow(size_t index); static uint8_t get_export_reason(Flow& flow); void finish(); diff --git a/src/plugins/storage/cache/src/sourceOptimization.cpp b/src/plugins/storage/cache/src/sourceOptimization.cpp new file mode 100644 index 000000000..6873d3558 --- /dev/null +++ b/src/plugins/storage/cache/src/sourceOptimization.cpp @@ -0,0 +1,234 @@ +/** + * Source optimization for cache plugin + * + * Author: Jimmy Björklund + * Copyright (C) 2024 LoLo Solutions Ltd + * Licensed under the MIT License (see LICENSE file for details) + * + * Check if source or destination IP is in given CIDR range and optionally exclude some subranges. + * This is used to collect all flows to/from a specific destination into single flow record in cache and then export them together when flow is exported. + * Main use case is when you want to limit the number of records exported and you only are intresed in where the traffic is going to/from rather than individual flows. + * + * storage: + * cache: + * source_optimization: true + * source_optimization_network: + * - main: "10.0.0.0/8" + * exclude: "10.0.2.1/32, 10.0.3.0/24" + * - main: "192.168.0.1/24" + * + * + */ +#include +#include +#include +#include +#include +#include +#include +#include "sourceOptimization.hpp" + +SourceOptimization::SourceOptimization() { + net_count = 0; + memset(&nets,0,sizeof(nets)); +} + +SourceOptimization::SourceOptimization(std::vector& vnets) { + net_count = 0; + memset(&nets,0,sizeof(nets)); + for( size_t i=0; i < vnets.size(); i++ ) { + std::string delimiter = ","; + std::string token; + std::string arg = vnets[i]; + bool main = true; + int ext = 0; + while ( arg.length() ) { + size_t pos = arg.find(delimiter); + token = arg.substr(0, pos); + if( main ) { + cidr_to_mask(token.c_str(), nets[net_count].cidr); + printf("Adding network for source optimization %s\n", token.c_str()); + main = false; + } else { + cidr_to_mask(token.c_str(), nets[net_count].cidr_exlude[ext]); + if( nets[net_count].cidr_exlude[ext].family != nets[net_count].cidr.family ) { + printf("Invalid network exclude range %s, family does not match main range.\n", token.c_str()); + break; + } + + if( nets[net_count].cidr.family == AF_INET && !ip_in_cidr(nets[net_count].cidr_exlude[ext].addr.v4, nets[net_count].cidr)) { + printf("Invalid network exclude range %s, need to be part of main range.\n", token.c_str()); + } else if( nets[net_count].cidr.family == AF_INET6 && !ip_in_cidr(nets[net_count].cidr_exlude[ext].addr.v6, nets[net_count].cidr)) { + printf("Invalid network exclude range %s, need to be part of main range.\n", token.c_str()); + } else { + printf("Exlude network for source optimization %s\n", token.c_str()); + } + } + if( pos == std::string::npos ) { + net_count++; + break; + } + arg.erase(0, pos + delimiter.length()); + } + } +} + +source_optimization_mode_t SourceOptimization::get_mode(ipxp::Packet& pkt) { + for( int i=0; i < net_count && i < MAX_CIDER_NETS; i++ ) { + if( pkt.ip_version == ipxp::IP::v4) { + u_int32_t src_ipv4 = ntohl(pkt.src_ip.v4); + u_int32_t dst_ipv4 = ntohl(pkt.dst_ip.v4); + if( ip_in_cidr( src_ipv4, nets[i].cidr ) ) { + bool excluded = false; + for ( int j=0; j < MAX_CIDER_EXLUDE && nets[i].cidr_exlude[j].mask.v4_mask != 0; j++ ) { + if( ip_in_cidr( src_ipv4, nets[i].cidr_exlude[j])) { + excluded = true; + break; + } + } + return excluded ? MODE_DST : MODE_SRC; + } else if( ip_in_cidr( dst_ipv4, nets[i].cidr)) { + bool excluded = false; + for ( int j=0; j < MAX_CIDER_EXLUDE && nets[i].cidr_exlude[j].mask.v4_mask != 0; j++ ) { + if( ip_in_cidr( dst_ipv4, nets[i].cidr_exlude[j])) { + excluded = true; + break; + } + } + return excluded ? MODE_SRC : MODE_DST; + } + } else { + if( ip_in_cidr( pkt.src_ip.v6, nets[i].cidr)) { + bool excluded = false; + for ( int j=0; j < MAX_CIDER_EXLUDE && nets[i].cidr_exlude[j].mask.v4_mask != 0; j++ ) { + if( ip_in_cidr( pkt.src_ip.v6, nets[i].cidr_exlude[j])) { + excluded = true; + break; + } + } + return excluded ? MODE_DST : MODE_SRC; + } else if( ip_in_cidr( pkt.dst_ip.v6, nets[i].cidr)) { + bool excluded = false; + for ( int j=0; j < MAX_CIDER_EXLUDE && nets[i].cidr_exlude[j].mask.v4_mask != 0; j++ ) { + if( ip_in_cidr( pkt.dst_ip.v6, nets[i].cidr_exlude[j])) { + excluded = true; + break; + } + } + return excluded ? MODE_SRC : MODE_DST; + } + } + } + return MODE_NONE; +} + +/** + * Convert IPv4 or IPv6 string to binary form and detect family + * Returns family or AF_UNSPEC on error + */ +int SourceOptimization::ip_to_binary(const char *ip_str, unsigned char *out_buf, size_t buf_len) { + if (buf_len < IP6_ADDR_LEN) return AF_UNSPEC; + + struct in_addr in4; + if (inet_pton(AF_INET, ip_str, &in4) == 1) { + uint32_t host_order = ntohl(in4.s_addr); // ← add this + memcpy(out_buf, &host_order, 4); // now host order + return AF_INET; + } + + struct in6_addr in6; + if (inet_pton(AF_INET6, ip_str, &in6) == 1) { + memcpy(out_buf, in6.s6_addr, IP6_ADDR_LEN); // IPv6 is always network order, but we treat as opaque + return AF_INET6; + } + return AF_UNSPEC; +} + +/** + * Parse CIDR string "ip/prefix" into cidr_mask + * Returns true on success + */ +bool SourceOptimization::cidr_to_mask(const char *cidr_str, cidr_mask& out) { + if (!cidr_str ) return false; + + char ip_part[128]; // large enough for IPv6 + int prefix = -1; + + if (sscanf(cidr_str, "%127[^/]/%d", ip_part, &prefix) != 2) { + return false; + } + + if (prefix < 0) return false; + + unsigned char addr_buf[IP6_ADDR_LEN]; + int family = ip_to_binary(ip_part, addr_buf, sizeof(addr_buf)); + + if (family == AF_UNSPEC) { + return false; + } + + out.family = family; + + if (family == AF_INET) { + if (prefix > 32) return false; + memcpy(&out.addr.v4, addr_buf, 4); + + if (prefix == 0) { + out.mask.v4_mask = 0; + } else { + out.mask.v4_mask = 0xFFFFFFFFU << (32 - prefix); + } + } else { // IPv6 + if (prefix > 128) return false; + memcpy(out.addr.v6, addr_buf, IP6_ADDR_LEN); + + // Build IPv6 mask: first 'prefix' bits set to 1 + memset(out.mask.v6_mask, 0, IP6_ADDR_LEN); + int full_bytes = prefix / 8; + int remain_bits = prefix % 8; + + memset(out.mask.v6_mask, 0xFF, full_bytes); + + if (remain_bits > 0 && full_bytes < IP6_ADDR_LEN) { + out.mask.v6_mask[full_bytes] = (0xFF << (8 - remain_bits)) & 0xFF; + } + } + return true; +} + +/** + * Check if ip_str is inside the given CIDR + * Returns true if it matches + */ +bool SourceOptimization::ip_in_cidr(const char *ip_str, const cidr_mask& cidr) { + + unsigned char ip_buf[IP6_ADDR_LEN]; + int family = ip_to_binary(ip_str, ip_buf, sizeof(ip_buf)); + + if (family == AF_UNSPEC ) { + printf("Invalid family\n"); + return false; + } + if( family != cidr.family ) { + return false; + } + if (family == AF_INET) { + uint32_t ip = *(uint32_t*)ip_buf; + return ip_in_cidr(ip, cidr); + } + // IPv6 + // Compare byte-by-byte after applying mask + return ip_in_cidr(ip_buf, cidr); +} +bool SourceOptimization::ip_in_cidr(uint32_t ipv4, const cidr_mask& cidr) { + return (ipv4 & cidr.mask.v4_mask) == (cidr.addr.v4 & cidr.mask.v4_mask); +} +bool SourceOptimization::ip_in_cidr(unsigned char ipv6[IP6_ADDR_LEN], const cidr_mask& cidr) { + // Compare byte-by-byte after applying mask + for (int i = 0; i < IP6_ADDR_LEN; i++) { + if ((ipv6[i] & cidr.mask.v6_mask[i]) != (cidr.addr.v6[i] & cidr.mask.v6_mask[i])) { + return false; + } + } + return true; +} \ No newline at end of file diff --git a/src/plugins/storage/cache/src/sourceOptimization.hpp b/src/plugins/storage/cache/src/sourceOptimization.hpp new file mode 100644 index 000000000..fb2971454 --- /dev/null +++ b/src/plugins/storage/cache/src/sourceOptimization.hpp @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include +#include + +// IPv6 address is 128 bits = 16 bytes +#define IP6_ADDR_LEN 16 +#define MAX_CIDER_NETS 10 +#define MAX_CIDER_EXLUDE 10 + +typedef struct { + int family; + union { + uint32_t v4; // host byte order + unsigned char v6[IP6_ADDR_LEN]; // binary form (network byte order usually, but we use host for simplicity) + } addr; + union { + uint32_t v4_mask; // host byte order + unsigned char v6_mask[IP6_ADDR_LEN]; + } mask; +} cidr_mask; + +typedef struct { + cidr_mask cidr; + cidr_mask cidr_exlude[MAX_CIDER_EXLUDE]; +} cidr_nets; + +typedef enum { + MODE_NONE = -1, + MODE_SRC = 1, + MODE_DST = 2 +} source_optimization_mode_t; + +class SourceOptimization { + public: + uint8_t net_count; + cidr_nets nets[MAX_CIDER_NETS]; + SourceOptimization(); + SourceOptimization(std::vector& vnets); + bool cidr_to_mask(const char *cidr_str, cidr_mask& out); + bool ip_in_cidr(const char *ip_str, const cidr_mask& cidr); + bool ip_in_cidr(uint32_t ipv4, const cidr_mask& cidr); + bool ip_in_cidr(unsigned char ipv6[IP6_ADDR_LEN], const cidr_mask& cidr); + int ip_to_binary(const char *ip_str, unsigned char *out_buf, size_t buf_len); + source_optimization_mode_t get_mode(ipxp::Packet& pkt); +}; From 28dcd2facedb44af799d3c9275b5f65f277f8d73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Bj=C3=B6rklund?= <9413089+JimmyBjorklund@users.noreply.github.com> Date: Fri, 13 Feb 2026 09:10:12 +0100 Subject: [PATCH 7/7] Sync with CESNET (#4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * dpdk: move RSS configuration into port configuration fix RSS configuration with i40e and multiple RX queues * dpdk: add rss argument to set user specific rss offload * ipfixprobed: add rss_offload option to DPDK input * dpdk: update README --------- Co-authored-by: Pavel Siska Co-authored-by: Pavel Siska Co-authored-by: Pavel Šiška --- init/config2args.py | 3 ++ init/link0.conf.example | 5 ++ init/schema.json | 10 +++- src/plugins/input/dpdk/README.md | 9 +++- src/plugins/input/dpdk/src/dpdk.cpp | 4 +- src/plugins/input/dpdk/src/dpdk.hpp | 17 +++++++ src/plugins/input/dpdk/src/dpdkDevice.cpp | 57 ++++++++++++++--------- src/plugins/input/dpdk/src/dpdkDevice.hpp | 7 ++- 8 files changed, 83 insertions(+), 29 deletions(-) diff --git a/init/config2args.py b/init/config2args.py index 7d25508af..2b5197244 100755 --- a/init/config2args.py +++ b/init/config2args.py @@ -157,6 +157,9 @@ def process_input_dpdk_plugin(settings): mtu = settings.get("mtu", 1518) if mtu is not None: primary_param += f"mtu={mtu};" + rss_offload = settings.get("rss_offload", None) + if rss_offload is not None: + primary_param += f"rss={rss_offload};" primary_param += f"eal={eal}\"" params = [] diff --git a/init/link0.conf.example b/init/link0.conf.example index 8e27a8e85..f6e265fec 100644 --- a/init/link0.conf.example +++ b/init/link0.conf.example @@ -35,6 +35,11 @@ input_plugin: eal_opts: null # EAL options (null = default options) mtu: null # Maximum Transmission Unit (defaults to RTE_ETHER_MAX_LEN) + # Our default RSS configuration is RTE_ETH_RSS_IP. + # Intel X710 (i40e) does not work reliably with it, so by default we use RSS provided by the NIC/driver. + # Set this explicitly to override the driver RSS configuration. + rss_offload: null + # Storage configuration (storage) storage: cache: diff --git a/init/schema.json b/init/schema.json index 95cd06b56..2e90e1d91 100644 --- a/init/schema.json +++ b/init/schema.json @@ -93,11 +93,11 @@ "ndp": { "type": "object", "properties": { - "device": { + "device": { "type": "array", "items": { "type": "string" - } + } }, "queues": { "type": "string" @@ -182,6 +182,12 @@ "string", "null" ] + }, + "rss_offload": { + "type": [ + "integer", + "null" + ] } }, "required": [ diff --git a/src/plugins/input/dpdk/README.md b/src/plugins/input/dpdk/README.md index b565710fd..3d5f20513 100644 --- a/src/plugins/input/dpdk/README.md +++ b/src/plugins/input/dpdk/README.md @@ -17,6 +17,7 @@ input_plugin: workers_cpu_list: [] eal_opts: null mtu: 1518 + rss_offload: null ``` ## Parameters @@ -36,6 +37,7 @@ input_plugin: |__workers_cpu_list__| [] (autofill) | List of CPU cores assigned to RX queues (must match number of rx_queues) | |__eal_opts__ | null | Extra options to be passed to the DPDK EAL (Environment Abstraction Layer). Can be used for fine-tuning DPDK behavior.| |__mtu__ | 1518 | Maximum Transmission Unit size for the interface. Defines the maximum packet size that can be received.| +|__rss_offload__ | null | RSS offload configuration. Can be used to override the default RSS offload configuration.| ## How to use @@ -240,9 +242,12 @@ grubby --update-kernel ALL --args "isolcpus=2-19,22-39" ``` -### 4. Validate with dpdk-testpmd +### 4. Troubleshooting -TODO +⚠️ RSS on Intel X710 (i40e) + +We observed that RSS on Intel X710 (i40e) may not distribute packets across multiple RX queues with the default RTE_ETH_RSS_IP. +For X710 (i40e) we use full RSS offload provided by the driver. If you experience similar issues, try to set `rss_offload` explicitly to override the default RSS offload configuration. ## FAQ diff --git a/src/plugins/input/dpdk/src/dpdk.cpp b/src/plugins/input/dpdk/src/dpdk.cpp index 55687e3b2..064cc121a 100644 --- a/src/plugins/input/dpdk/src/dpdk.cpp +++ b/src/plugins/input/dpdk/src/dpdk.cpp @@ -86,12 +86,14 @@ void DpdkCore::configure(const char* params) uint16_t rxQueueCount = parser.rx_queues(); m_mBufsCount = parser.pkt_buffer_size(); uint16_t mtuSize = parser.mtu_size(); + uint64_t rssOffload = parser.rss_offload(); configureEal(parser.eal_params()); m_dpdkDevices.reserve(parser.port_numbers().size()); for (auto portID : parser.port_numbers()) { - m_dpdkDevices.emplace_back(portID, rxQueueCount, mempoolSize, m_mBufsCount, mtuSize); + m_dpdkDevices + .emplace_back(portID, rxQueueCount, mempoolSize, m_mBufsCount, mtuSize, rssOffload); } isConfigured = true; diff --git a/src/plugins/input/dpdk/src/dpdk.hpp b/src/plugins/input/dpdk/src/dpdk.hpp index 7b2003841..2c5ea7db0 100644 --- a/src/plugins/input/dpdk/src/dpdk.hpp +++ b/src/plugins/input/dpdk/src/dpdk.hpp @@ -34,6 +34,7 @@ class DpdkOptParser : public OptionsParser { uint16_t rx_queues_ = 1; std::string eal_; uint16_t mtu_; + uint64_t rss_offload_ = 0; std::vector parsePortNumbers(std::string arg) { @@ -123,6 +124,20 @@ class DpdkOptParser : public OptionsParser { return true; }, RequiredArgument); + register_option( + "r", + "rss", + "VALUE", + "RSS offload value. Default: 0", + [this](const char* arg) { + try { + rss_offload_ = str2num(arg); + } catch (std::invalid_argument&) { + return false; + } + return true; + }, + RequiredArgument); register_option( "e", "eal", @@ -160,6 +175,8 @@ class DpdkOptParser : public OptionsParser { uint16_t rx_queues() const { return rx_queues_; } uint16_t mtu_size() const { return mtu_; } + + uint64_t rss_offload() const { return rss_offload_; } }; class DpdkCore { diff --git a/src/plugins/input/dpdk/src/dpdkDevice.cpp b/src/plugins/input/dpdk/src/dpdkDevice.cpp index c05ac890d..806ff9921 100644 --- a/src/plugins/input/dpdk/src/dpdkDevice.cpp +++ b/src/plugins/input/dpdk/src/dpdkDevice.cpp @@ -42,7 +42,8 @@ DpdkDevice::DpdkDevice( uint16_t rxQueueCount, uint16_t memPoolSize, uint16_t mbufsCount, - uint16_t mtuSize) + uint16_t mtuSize, + uint64_t rssOffload) : m_portID(portID) , m_rxQueueCount(rxQueueCount) , m_txQueueCount(0) @@ -51,13 +52,13 @@ DpdkDevice::DpdkDevice( , m_supportedRSS(false) , m_supportedHWTimestamp(false) , m_mtuSize(mtuSize) + , m_rssOffload(rssOffload) { validatePort(); recognizeDriver(); configurePort(); initMemPools(memPoolSize); setupRxQueues(memPoolSize); - configureRSS(); enablePort(); } @@ -95,9 +96,13 @@ void DpdkDevice::recognizeDriver() std::cerr << "\tflow type RSS offloads: " << rteDevInfo.flow_type_rss_offloads << std::endl; /* Check if RSS hashing is supported in NIC */ - m_supportedRSS = (rteDevInfo.flow_type_rss_offloads & RTE_ETH_RSS_IP) != 0; - std::cerr << "\tDetected RSS offload capability: " << (m_supportedRSS ? "yes" : "no") - << std::endl; + if (m_rxQueueCount > 1) { + m_supportedRSS = (rteDevInfo.flow_type_rss_offloads & RTE_ETH_RSS_IP) != 0; + std::cerr << "\tDetected RSS offload capability: " << (m_supportedRSS ? "yes" : "no") + << std::endl; + } else { + m_supportedRSS = false; + } /* Check if HW timestamps are supported, we support NFB cards only */ if (m_isNfbDpdkDriver) { @@ -154,7 +159,9 @@ rte_eth_conf DpdkDevice::createPortConfig() if (m_supportedRSS) { portConfig.rxmode.mq_mode = RTE_ETH_MQ_RX_RSS; + portConfig.rx_adv_conf.rss_conf = createRSSConfig(); } else { + std::cerr << "Skipped RSS hash setting for port " << m_portID << "." << std::endl; portConfig.rxmode.mq_mode = RTE_ETH_MQ_RX_NONE; } @@ -220,12 +227,9 @@ void DpdkDevice::setupRxQueues(uint16_t memPoolSize) << " set up. Size of each queue: " << rxQueueSize << std::endl; } -void DpdkDevice::configureRSS() +rte_eth_rss_conf DpdkDevice::createRSSConfig() { - if (!m_supportedRSS) { - std::cerr << "Skipped RSS hash setting for port " << m_portID << "." << std::endl; - return; - } + struct rte_eth_rss_conf rssConfig = {}; rte_eth_dev_info rteDevInfo; if (rte_eth_dev_info_get(m_portID, &rteDevInfo)) { @@ -243,23 +247,32 @@ void DpdkDevice::configureRSS() return hashKey[idx++ % sizeof(hashKey)]; }); - const uint64_t rssOffloads = rteDevInfo.flow_type_rss_offloads & RTE_ETH_RSS_IP; - if (rssOffloads != RTE_ETH_RSS_IP) { - std::cerr << "RTE_ETH_RSS_IP is not supported by the card. Used subset: " << rssOffloads - << std::endl; + uint64_t rssOffloads = 0; + if (m_rssOffload) { // user specified RSS offload + rssOffloads = m_rssOffload; + } else { + if (std::string(rteDevInfo.driver_name) == "net_i40e") { + std::cerr << "RTE_ETH_RSS_IP is not supported reliably by this driver, falling back to " + "NIC-provided RSS: " + << rteDevInfo.flow_type_rss_offloads << std::endl; + std::cerr << "You can override this behavior using the 'rss' configuration parameter." + << std::endl; + rssOffloads = rteDevInfo.flow_type_rss_offloads; + } else { + rssOffloads = rteDevInfo.flow_type_rss_offloads & RTE_ETH_RSS_IP; + if (rssOffloads != RTE_ETH_RSS_IP) { + std::cerr << "RTE_ETH_RSS_IP is not supported by the card. Used subset: " + << rssOffloads << std::endl; + } + } } - struct rte_eth_rss_conf rssConfig = {}; + std::cerr << "Using RSS offloads: " << rssOffloads << std::endl; + rssConfig.rss_key = m_hashKey.data(); rssConfig.rss_key_len = rssHashKeySize; rssConfig.rss_hf = rssOffloads; - - int ret = rte_eth_dev_rss_hash_update(m_portID, &rssConfig); - if (ret < 0) { - std::cerr << "Setting RSS {" << rssOffloads << "} for port " << m_portID - << " failed. Errno:" << ret << std::endl; - throw PluginError("DpdkDevice::configureRSS() has failed."); - } + return rssConfig; } void DpdkDevice::enablePort() diff --git a/src/plugins/input/dpdk/src/dpdkDevice.hpp b/src/plugins/input/dpdk/src/dpdkDevice.hpp index 3c203a086..c18ceece6 100644 --- a/src/plugins/input/dpdk/src/dpdkDevice.hpp +++ b/src/plugins/input/dpdk/src/dpdkDevice.hpp @@ -48,13 +48,15 @@ class DpdkDevice { * @param memPoolSize The size of the memory pool for packet buffers. * @param mbufsCount The number of mbufs (packet buffers) to be allocated. * @param mtuSize Maximum transmission unit of input interface. + * @param rssOffload RSS offload value. 0 for subset of RTE_ETH_RSS_IP. */ DpdkDevice( uint16_t portID, uint16_t rxQueueCount, uint16_t memPoolSize, uint16_t mbufsCount, - uint16_t mtuSize); + uint16_t mtuSize, + uint64_t rssOffload); /** * @brief Receives packets from the specified receive queue of the DPDK device. @@ -84,7 +86,7 @@ class DpdkDevice { rte_eth_conf createPortConfig(); void initMemPools(uint16_t memPoolSize); void setupRxQueues(uint16_t memPoolSize); - void configureRSS(); + rte_eth_rss_conf createRSSConfig(); void enablePort(); void createRteMempool(uint16_t mempoolSize); void setRxTimestampDynflag(); @@ -102,6 +104,7 @@ class DpdkDevice { int m_rxTimestampOffset; int m_rxTimestampDynflag; uint16_t m_mtuSize; + uint64_t m_rssOffload = 0; }; } // namespace ipxp