From ff070de413059fa214ee5aea3d6f14c8315107b1 Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Sat, 21 Dec 2024 21:19:24 +0100 Subject: [PATCH] Refactor cache --- Makefile.am | 8 + include/ipfixprobe/packet.hpp | 14 +- include/ipfixprobe/storage.hpp | 10 +- input/parser.cpp | 3 +- storage/cache.cpp | 717 ++++++++++++--------------------- storage/cache.hpp | 361 +++-------------- storage/cacheOptParser.cpp | 101 +++++ storage/cacheOptParser.hpp | 32 ++ storage/cacheRowSpan.cpp | 42 ++ storage/cacheRowSpan.hpp | 21 + storage/cttController.cpp | 85 ++++ storage/cttController.hpp | 82 ++++ storage/flowKey.tpp | 23 ++ storage/flowRecord.cpp | 132 ++++++ storage/flowRecord.hpp | 32 ++ 15 files changed, 890 insertions(+), 773 deletions(-) create mode 100644 storage/cacheOptParser.cpp create mode 100644 storage/cacheOptParser.hpp create mode 100644 storage/cacheRowSpan.cpp create mode 100644 storage/cacheRowSpan.hpp create mode 100644 storage/cttController.cpp create mode 100644 storage/cttController.hpp create mode 100644 storage/flowKey.tpp create mode 100644 storage/flowRecord.cpp create mode 100644 storage/flowRecord.hpp diff --git a/Makefile.am b/Makefile.am index c980811d..5f9fedcf 100644 --- a/Makefile.am +++ b/Makefile.am @@ -70,6 +70,14 @@ ipfixprobe_storage_src=\ storage/fragmentationCache/fragmentationCache.cpp \ storage/cache.cpp \ storage/cache.hpp \ + storage/cacheOptParser.hpp \ + storage/cacheOptParser.cpp \ + storage/flowRecord.hpp \ + storage/flowRecord.cpp \ + storage/cttController.hpp \ + storage/cttController.cpp \ + storage/cacheRowSpan.hpp \ + storage/cacheRowSpan.cpp \ storage/xxhash.c \ storage/xxhash.h diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index 575c2fb9..90bfe4fc 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -31,6 +31,7 @@ #ifndef IPXP_PACKET_HPP #define IPXP_PACKET_HPP +//#define WITH_CTT 1 // TODO REMOVE #include #include @@ -46,10 +47,6 @@ namespace ipxp { * \brief Structure for storing parsed packet fields */ struct Packet : public Record { - #ifdef WITH_CTT - Metadata_CTT cttmeta; /**< Metadata from CTT */ - bool cttmeta_valid; /**< True if CTT metadata is valid */ - #endif /* WITH_CTT */ struct timeval ts; uint8_t dst_mac[6]; @@ -106,12 +103,16 @@ struct Packet : public Record { uint16_t buffer_size; /**< Size of buffer */ bool source_pkt; /**< Direction of packet from flow point of view */ +#ifdef WITH_CTT + Metadata_CTT cttmeta; /**< Metadata from CTT */ + bool cttmeta_valid; /**< True if CTT metadata is valid */ +#endif /* WITH_CTT */ /** * \brief Constructor. */ Packet() : - cttmeta_valid(false), ts({0}), + ts({0}), dst_mac(), src_mac(), ethertype(0), ip_len(0), ip_payload_len(0), ip_version(0), ip_ttl(0), ip_proto(0), ip_tos(0), ip_flags(0), src_ip({0}), dst_ip({0}), vlan_id(0), @@ -123,6 +124,9 @@ struct Packet : public Record { custom(nullptr), custom_len(0), buffer(nullptr), buffer_size(0), source_pkt(true) +#ifdef WITH_CTT + ,cttmeta_valid(false) +#endif /* WITH_CTT */ { } }; diff --git a/include/ipfixprobe/storage.hpp b/include/ipfixprobe/storage.hpp index 5296557e..f16bb59f 100644 --- a/include/ipfixprobe/storage.hpp +++ b/include/ipfixprobe/storage.hpp @@ -133,22 +133,22 @@ class StoragePlugin : public Plugin /** * \brief Checks if process plugins require all available data. - * \param [in] rec Stored flow record. + * \param [in] flow Stored flow record. * \return True if all data required, false otherwise. */ bool all_data_required(const Flow& flow) const noexcept { - return m_plugins_status.get_all_data.any(); + return flow.plugins_status.get_all_data.any(); } /** * \brief Checks if process plugins don't require any data. - * \param [in] rec Stored flow record. + * \param [in] flow Stored flow record. * \return True if no data required, false otherwise. */ bool no_data_required(const Flow& flow) const noexcept { - return m_plugins_status.get_no_data.all(); + return flow.plugins_status.get_no_data.all(); } /** @@ -190,9 +190,11 @@ class StoragePlugin : public Plugin int plugins_post_create(Flow& rec, const Packet& pkt) { // if metadata are valid, add flow hash ctt to the flow record +#ifdef WITH_CTT if (pkt.cttmeta_valid) { rec.flow_hash_ctt = pkt.cttmeta.flow_hash; } +#endif /* WITH_CTT */ PluginStatusConverter plugin_status_converter(m_plugins_status); int ret = 0; for (unsigned int i = 0; i < m_plugin_cnt; i++) { diff --git a/input/parser.cpp b/input/parser.cpp index ec5a2b5b..d5fded1c 100644 --- a/input/parser.cpp +++ b/input/parser.cpp @@ -777,6 +777,7 @@ void parse_packet(parser_opt_t *opt, ParserStats& stats, struct timeval ts, cons opt->pblock->bytes += len; } +#ifdef WITH_CTT int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen) { if (opt->pblock->cnt >= opt->pblock->size) { @@ -892,5 +893,5 @@ int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metad opt->pblock->bytes += len; return 0; } - +#endif /* WITH_CTT */ } diff --git a/storage/cache.cpp b/storage/cache.cpp index af8171c4..99fdab69 100644 --- a/storage/cache.cpp +++ b/storage/cache.cpp @@ -38,6 +38,9 @@ #include #include "cache.hpp" + +#include +#include "cacheRowSpan.hpp" #include "xxhash.h" namespace ipxp { @@ -48,144 +51,55 @@ __attribute__((constructor)) static void register_this_plugin() register_plugin(&rec); } -FlowRecord::FlowRecord() -{ - erase(); -}; - -FlowRecord::~FlowRecord() -{ - erase(); -}; - -void FlowRecord::erase() +OptionsParser * NHTFlowCache::get_parser() const { - m_flow.remove_extensions(); - m_hash = 0; - - memset(&m_flow.time_first, 0, sizeof(m_flow.time_first)); - memset(&m_flow.time_last, 0, sizeof(m_flow.time_last)); - m_flow.ip_version = 0; - m_flow.ip_proto = 0; - memset(&m_flow.src_ip, 0, sizeof(m_flow.src_ip)); - memset(&m_flow.dst_ip, 0, sizeof(m_flow.dst_ip)); - m_flow.src_port = 0; - m_flow.dst_port = 0; - m_flow.src_packets = 0; - m_flow.dst_packets = 0; - m_flow.src_bytes = 0; - m_flow.dst_bytes = 0; - m_flow.src_tcp_flags = 0; - m_flow.dst_tcp_flags = 0; -} -void FlowRecord::reuse() -{ - m_flow.remove_extensions(); - m_flow.time_first = m_flow.time_last; - m_flow.src_packets = 0; - m_flow.dst_packets = 0; - m_flow.src_bytes = 0; - m_flow.dst_bytes = 0; - m_flow.src_tcp_flags = 0; - m_flow.dst_tcp_flags = 0; + return new CacheOptParser(); } -inline __attribute__((always_inline)) bool FlowRecord::is_empty() const +std::string NHTFlowCache::get_name() const noexcept { - return m_hash == 0; + return "cache"; } -inline __attribute__((always_inline)) bool FlowRecord::belongs(uint64_t hash) const +NHTFlowCache::NHTFlowCache() : + m_cache_size(0), m_line_size(0), m_line_mask(0), m_new_flow_insert_index(0), + m_queue_size(0), m_active(0), m_inactive(0), + m_split_biflow(false), m_enable_fragmentation_cache(true), + m_fragmentation_cache(0, 0) { - return hash == m_hash; } -void FlowRecord::create(const Packet &pkt, uint64_t hash) -{ - m_flow.src_packets = 1; - - m_hash = hash; - - m_flow.time_first = pkt.ts; - m_flow.time_last = pkt.ts; - m_flow.flow_hash = hash; - - memcpy(m_flow.src_mac, pkt.src_mac, 6); - memcpy(m_flow.dst_mac, pkt.dst_mac, 6); - - if (pkt.ip_version == IP::v4) { - m_flow.ip_version = pkt.ip_version; - m_flow.ip_proto = pkt.ip_proto; - m_flow.src_ip.v4 = pkt.src_ip.v4; - m_flow.dst_ip.v4 = pkt.dst_ip.v4; - m_flow.src_bytes = pkt.ip_len; - } else if (pkt.ip_version == IP::v6) { - m_flow.ip_version = pkt.ip_version; - m_flow.ip_proto = pkt.ip_proto; - memcpy(m_flow.src_ip.v6, pkt.src_ip.v6, 16); - memcpy(m_flow.dst_ip.v6, pkt.dst_ip.v6, 16); - m_flow.src_bytes = pkt.ip_len; - } - - if (pkt.ip_proto == IPPROTO_TCP) { - m_flow.src_port = pkt.src_port; - m_flow.dst_port = pkt.dst_port; - m_flow.src_tcp_flags = pkt.tcp_flags; - } else if (pkt.ip_proto == IPPROTO_UDP) { - m_flow.src_port = pkt.src_port; - m_flow.dst_port = pkt.dst_port; - } else if (pkt.ip_proto == IPPROTO_ICMP || - pkt.ip_proto == IPPROTO_ICMPV6) { - m_flow.src_port = pkt.src_port; - m_flow.dst_port = pkt.dst_port; - } - #ifdef WITH_CTT - m_flow.is_delayed = false; - m_delayed_flow_waiting = false; - #endif /* WITH_CTT */ -} - -void FlowRecord::update(const Packet &pkt, bool src) +NHTFlowCache::~NHTFlowCache() { - if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow - auto flow_hash = m_hash; - m_delayed_flow = m_flow; - m_delayed_flow_waiting = true; - erase(); // erase the old flow, keeping the delayed flow - create(pkt, flow_hash); - return; - } - m_flow.time_last = pkt.ts; - if (src) { - m_flow.src_packets++; - m_flow.src_bytes += pkt.ip_len; - - if (pkt.ip_proto == IPPROTO_TCP) { - m_flow.src_tcp_flags |= pkt.tcp_flags; - } - } else { - m_flow.dst_packets++; - m_flow.dst_bytes += pkt.ip_len; - - if (pkt.ip_proto == IPPROTO_TCP) { - m_flow.dst_tcp_flags |= pkt.tcp_flags; - } - } + close(); } - -NHTFlowCache::NHTFlowCache() : - m_cache_size(0), m_line_size(0), m_line_mask(0), m_line_new_idx(0), - m_qsize(0), m_qidx(0), m_timeout_idx(0), m_active(0), m_inactive(0), - m_split_biflow(false), m_enable_fragmentation_cache(true), m_keylen(0), - m_key(), m_key_inv(), m_flow_table(nullptr), m_flow_records(nullptr), - m_fragmentation_cache(0, 0) +void NHTFlowCache::get_parser_options(CacheOptParser& parser) noexcept { + m_cache_size = parser.m_cache_size; + m_line_size = parser.m_line_size; + m_active = parser.m_active; + m_inactive = parser.m_inactive; + m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1); + m_new_flow_insert_index = m_line_size / 2; + m_split_biflow = parser.m_split_biflow; + m_enable_fragmentation_cache = parser.m_enable_fragmentation_cache; +#ifdef WITH_CTT + m_ctt_controller.init(parser.m_dev, 0); +#endif /* WITH_CTT */ } -NHTFlowCache::~NHTFlowCache() +void NHTFlowCache::allocate_table() { - close(); + try { + m_flow_table.resize(m_cache_size + m_queue_size); + m_flows.resize(m_cache_size + m_queue_size); + std::for_each(m_flow_table.begin(), m_flow_table.end(), [index = 0, this](FlowRecord*& flow) mutable { + flow = &m_flows[index++]; + }); + } catch (std::bad_alloc &e) { + throw PluginError("not enough memory for flow cache allocation"); + } } void NHTFlowCache::init(const char *params) @@ -197,41 +111,17 @@ void NHTFlowCache::init(const char *params) throw PluginError(e.what()); } - m_cache_size = parser.m_cache_size; - m_line_size = parser.m_line_size; - m_active = parser.m_active; - m_inactive = parser.m_inactive; - m_qidx = 0; - m_timeout_idx = 0; - m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1); - m_line_new_idx = m_line_size / 2; - #ifdef WITH_CTT - m_ctt_controller.init(parser.m_dev, 0); - #endif /* WITH_CTT */ - + get_parser_options(parser); if (m_export_queue == nullptr) { throw PluginError("output queue must be set before init"); } - if (m_line_size > m_cache_size) { throw PluginError("flow cache line size must be greater or equal to cache size"); } if (m_cache_size == 0) { throw PluginError("flow cache won't properly work with 0 records"); } - - try { - m_flow_table = new FlowRecord*[m_cache_size + m_qsize]; - m_flow_records = new FlowRecord[m_cache_size + m_qsize]; - for (decltype(m_cache_size + m_qsize) i = 0; i < m_cache_size + m_qsize; i++) { - m_flow_table[i] = m_flow_records + i; - } - } catch (std::bad_alloc &e) { - throw PluginError("not enough memory for flow cache allocation"); - } - - m_split_biflow = parser.m_split_biflow; - m_enable_fragmentation_cache = parser.m_enable_fragmentation_cache; + allocate_table(); if (m_enable_fragmentation_cache) { try { @@ -240,39 +130,28 @@ void NHTFlowCache::init(const char *params) throw PluginError("not enough memory for fragment cache allocation"); } } - -#ifdef FLOW_CACHE_STATS - m_empty = 0; - m_not_empty = 0; - m_hits = 0; - m_expired = 0; - m_flushed = 0; - m_lookups = 0; - m_lookups2 = 0; -#endif /* FLOW_CACHE_STATS */ } void NHTFlowCache::close() { - if (m_flow_records != nullptr) { - delete [] m_flow_records; - m_flow_records = nullptr; - } - if (m_flow_table != nullptr) { - delete [] m_flow_table; - m_flow_table = nullptr; - } + m_flows.clear(); + m_flow_table.clear(); } void NHTFlowCache::set_queue(ipx_ring_t *queue) { m_export_queue = queue; - m_qsize = ipx_ring_size(queue); + m_queue_size = ipx_ring_size(queue); +} + +void NHTFlowCache::export_flow(size_t flow_index) +{ + export_flow(flow_index, get_export_reason(m_flow_table[flow_index]->m_flow)); } -void NHTFlowCache::export_flow(size_t index) +void NHTFlowCache::export_flow(size_t flow_index, int reason) { - if (m_flow_table[index]->m_flow.is_delayed) { + /*if (m_flow_table[index]->m_flow.is_delayed) { return; } if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) { @@ -288,12 +167,18 @@ void NHTFlowCache::export_flow(size_t index) update_flow_record_stats( m_flow_table[index]->m_flow.src_packets + m_flow_table[index]->m_flow.dst_packets); - m_flows_in_cache--; - - ipx_ring_push(m_export_queue, &m_flow_table[index]->m_flow); - std::swap(m_flow_table[index], m_flow_table[m_cache_size + m_qidx]); - m_flow_table[index]->erase(); - m_qidx = (m_qidx + 1) % m_qsize; + m_flows_in_cache--;*/ + m_flow_table[flow_index]->m_flow.end_reason = reason; + m_cache_stats.expired++; + push_to_export_queue(flow_index); + m_flow_table[flow_index]->erase(); +} + +void NHTFlowCache::push_to_export_queue(size_t flow_index) noexcept +{ + ipx_ring_push(m_export_queue, &m_flow_table[flow_index]->m_flow); + std::swap(m_flow_table[flow_index], m_flow_table[m_cache_size + m_queue_index]); + m_queue_index = (m_queue_index + 1) % m_queue_size; } void NHTFlowCache::finish() @@ -301,206 +186,186 @@ void NHTFlowCache::finish() for (decltype(m_cache_size) i = 0; i < m_cache_size; i++) { if (!m_flow_table[i]->is_empty()) { plugins_pre_export(m_flow_table[i]->m_flow); - m_flow_table[i]->m_flow.end_reason = FLOW_END_FORCED; - export_flow(i); -#ifdef FLOW_CACHE_STATS - m_expired++; -#endif /* FLOW_CACHE_STATS */ + //m_flow_table[i]->m_flow.end_reason = FLOW_END_FORCED; + export_flow(i, FLOW_END_FORCED); + //m_cache_stats.expired++; } } } -void NHTFlowCache::flush(Packet &pkt, size_t flow_index, int ret, bool source_flow) +void NHTFlowCache::flush(Packet &pkt, size_t flow_index, int status, bool source_flow) { -#ifdef FLOW_CACHE_STATS - m_flushed++; -#endif /* FLOW_CACHE_STATS */ + m_cache_stats.flushed++; - if (ret == ProcessPlugin::FlowAction::FLUSH_WITH_REINSERT) { - FlowRecord *flow = m_flow_table[flow_index]; - flow->m_flow.end_reason = FLOW_END_FORCED; - ipx_ring_push(m_export_queue, &flow->m_flow); + if (status == ProcessPlugin::FlowAction::FLUSH_WITH_REINSERT) { + //FlowRecord *flow = m_flow_table[flow_index]; + //export_flow(flow_index, FLOW_END_FORCED); + push_to_export_queue(flow_index); + //flow->m_flow.end_reason = FLOW_END_FORCED; + //ipx_ring_push(m_export_queue, &flow->m_flow); + //std::swap(m_flow_table[flow_index], m_flow_table[m_cache_size + m_queue_index]); + //flow = m_flow_table[flow_index]; - std::swap(m_flow_table[flow_index], m_flow_table[m_cache_size + m_qidx]); + m_flow_table[flow_index]->m_flow.remove_extensions(); + *m_flow_table[flow_index] = *m_flow_table[m_cache_size + m_queue_index]; + //m_queue_index = (m_queue_index + 1) % m_queue_size; - flow = m_flow_table[flow_index]; - flow->m_flow.remove_extensions(); - *flow = *m_flow_table[m_cache_size + m_qidx]; - m_qidx = (m_qidx + 1) % m_qsize; + m_flow_table[flow_index]->m_flow.m_exts = nullptr; + m_flow_table[flow_index]->reuse(); // Clean counters, set time first to last + m_flow_table[flow_index]->update(pkt, source_flow); // Set new counters from packet - flow->m_flow.m_exts = nullptr; - flow->reuse(); // Clean counters, set time first to last - flow->update(pkt, source_flow); // Set new counters from packet - - ret = plugins_post_create(flow->m_flow, pkt); - if (ret & ProcessPlugin::FlowAction::FLUSH) { - flush(pkt, flow_index, ret, source_flow); + const size_t post_create_return_flags = plugins_post_create(m_flow_table[flow_index]->m_flow, pkt); + if (post_create_return_flags & ProcessPlugin::FlowAction::FLUSH) { + flush(pkt, flow_index, post_create_return_flags, source_flow); } } else { - m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_FORCED; - export_flow(flow_index); + //m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_FORCED; + export_flow(flow_index, FLOW_END_FORCED); } } -int NHTFlowCache::put_pkt(Packet &pkt) +std::tuple, std::optional, bool> NHTFlowCache::find_flow_index(const Packet& packet) noexcept { - int ret = plugins_pre_create(pkt); + if (!create_hash_key(packet)) { + return {std::nullopt, std::nullopt, false}; + } - if (m_enable_fragmentation_cache) { - try_to_fill_ports_to_fragmented_packet(pkt); + const auto key_hasher = [](const auto& key) + { + return XXH64(&key, sizeof(key), 0); + }; + + const size_t direct_hash_value = std::visit(key_hasher, m_key); + const size_t first_flow_in_raw = direct_hash_value & m_line_mask; + const CacheRowSpan raw_span_direct(&m_flow_table[first_flow_in_raw], m_line_size); + std::optional flow_index = raw_span_direct.find_by_hash(direct_hash_value); + if (flow_index.has_value()) { + return {direct_hash_value, flow_index, true}; } - if (!create_hash_key(pkt)) { // saves key value and key length into attributes NHTFlowCache::key and NHTFlowCache::m_keylen - return 0; + const size_t reversed_hash_value = std::visit(key_hasher, m_key_reversed); + const size_t first_flow_in_raw_reversed = direct_hash_value & m_line_mask; + const CacheRowSpan raw_span_reverse(&m_flow_table[first_flow_in_raw_reversed], m_line_size); + flow_index = raw_span_reverse.find_by_hash(direct_hash_value); + if (flow_index.has_value()) { + return {reversed_hash_value, flow_index, false}; } - prefetch_export_expired(); + return {direct_hash_value, std::nullopt, false}; +} - uint64_t hashval = XXH64(m_key, m_keylen, 0); /* Calculates hash value from key created before. */ +static bool isTcpConnectionRestart(const Packet& packet, const Flow& flow, bool source_to_destination) noexcept +{ + constexpr uint8_t TCP_FIN = 0x01; + constexpr uint8_t TCP_RST = 0x04; + constexpr uint8_t TCP_SYN = 0x02; + const uint8_t flags = source_to_destination ? flow.src_tcp_flags : flow.dst_tcp_flags; + return packet.tcp_flags & TCP_SYN && (flags & (TCP_FIN | TCP_RST)); +} - FlowRecord *flow; /* Pointer to flow we will be working with. */ - bool found = false; - bool source_flow = true; - uint32_t line_index = hashval & m_line_mask; /* Get index of flow line. */ - uint32_t flow_index = 0; - uint32_t next_line = line_index + m_line_size; +bool NHTFlowCache::export_on_inactive_timeout(size_t flow_index, time_t ts) noexcept +{ + if (ts - m_flow_table[flow_index]->m_flow.time_last.tv_sec >= m_inactive) { + plugins_pre_export(m_flow_table[flow_index]->m_flow); + export_flow(flow_index); + return true; + } + return false; +} - /* Find existing flow record in flow cache. */ - for (flow_index = line_index; flow_index < next_line; flow_index++) { - if (m_flow_table[flow_index]->belongs(hashval)) { - found = true; - break; - } +bool NHTFlowCache::export_on_active_timeout(size_t flow_index, time_t ts) noexcept +{ + if (ts - m_flow_table[flow_index]->m_flow.time_first.tv_sec >= m_active) { + m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_ACTIVE; + plugins_pre_export(m_flow_table[flow_index]->m_flow); + export_flow(flow_index); + return true; } + return false; +} - /* Find inversed flow. */ - if (!found && !m_split_biflow) { - uint64_t hashval_inv = XXH64(m_key_inv, m_keylen, 0); - uint64_t line_index_inv = hashval_inv & m_line_mask; - uint64_t next_line_inv = line_index_inv + m_line_size; - for (flow_index = line_index_inv; flow_index < next_line_inv; flow_index++) { - if (m_flow_table[flow_index]->belongs(hashval_inv)) { - found = true; - source_flow = false; - hashval = hashval_inv; - line_index = line_index_inv; - break; - } - } +int NHTFlowCache::put_pkt(Packet &pkt) +{ + plugins_pre_create(pkt); + + if (m_enable_fragmentation_cache) { + try_to_fill_ports_to_fragmented_packet(pkt); + } + + auto [hash_value, flow_index, source_to_destination] = find_flow_index(pkt); + const bool hash_created = hash_value.has_value(); + const bool flow_found = flow_index.has_value(); + if (!hash_created) { + return 0; } + const size_t row_begin = hash_value.value() & m_line_mask; + CacheRowSpan row_span(&m_flow_table[row_begin], m_line_size); - if (found) { + prefetch_export_expired(); + + if (flow_found) { /* Existing flow record was found, put flow record at the first index of flow line. */ -#ifdef FLOW_CACHE_STATS - m_lookups += (flow_index - line_index + 1); - m_lookups2 += (flow_index - line_index + 1) * (flow_index - line_index + 1); -#endif /* FLOW_CACHE_STATS */ - - flow = m_flow_table[flow_index]; - for (decltype(flow_index) j = flow_index; j > line_index; j--) { - m_flow_table[j] = m_flow_table[j - 1]; - } + m_cache_stats.lookups += (flow_index.value() - row_begin + 1); + m_cache_stats.lookups2 += (flow_index.value() - row_begin + 1) * (flow_index.value() - row_begin + 1); + m_cache_stats.hits++; - m_flow_table[line_index] = flow; - flow_index = line_index; -#ifdef FLOW_CACHE_STATS - m_hits++; -#endif /* FLOW_CACHE_STATS */ + row_span.advance_flow(flow_index.value()); } else { /* Existing flow record was not found. Find free place in flow line. */ - for (flow_index = line_index; flow_index < next_line; flow_index++) { - if (m_flow_table[flow_index]->is_empty()) { - found = true; - break; - } - } - if (!found) { - /* If free place was not found (flow line is full), find - * record which will be replaced by new record. */ - flow_index = next_line - 1; - - // Export flow - plugins_pre_export(m_flow_table[flow_index]->m_flow); - m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_NO_RES; - export_flow(flow_index); - -#ifdef FLOW_CACHE_STATS - m_expired++; -#endif /* FLOW_CACHE_STATS */ - uint32_t flow_new_index = line_index + m_line_new_idx; - flow = m_flow_table[flow_index]; - for (decltype(flow_index) j = flow_index; j > flow_new_index; j--) { - m_flow_table[j] = m_flow_table[j - 1]; - } - flow_index = flow_new_index; - m_flow_table[flow_new_index] = flow; -#ifdef FLOW_CACHE_STATS - m_not_empty++; + const std::optional empty_index = row_span.find_empty(); + const bool empty_found = empty_index.has_value(); + if (empty_found) { + flow_index = empty_index.value() + row_begin; + m_cache_stats.empty++; } else { - m_empty++; -#endif /* FLOW_CACHE_STATS */ + row_span.advance_flow_to(m_line_size - 1, m_new_flow_insert_index); + flow_index = row_begin + m_new_flow_insert_index; + plugins_pre_export(m_flow_table[flow_index.value()]->m_flow); + m_flow_table[flow_index.value()]->m_flow.end_reason = FLOW_END_NO_RES; + export_flow(flow_index.value()); + m_cache_stats.expired++; + m_cache_stats.not_empty++; } } - pkt.source_pkt = source_flow; - flow = m_flow_table[flow_index]; - - uint8_t flw_flags = source_flow ? flow->m_flow.src_tcp_flags : flow->m_flow.dst_tcp_flags; - if ((pkt.tcp_flags & 0x02) && (flw_flags & (0x01 | 0x04))) { - // Flows with FIN or RST TCP flags are exported when new SYN packet arrives - m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_EOF; - export_flow(flow_index); + pkt.source_pkt = source_to_destination; + if (isTcpConnectionRestart(pkt, m_flow_table[flow_index.value()]->m_flow, source_to_destination)) { + //m_flow_table[flow_index.value()]->m_flow.end_reason = FLOW_END_EOF; + export_flow(flow_index.value(), FLOW_END_EOF); put_pkt(pkt); return 0; } - if (flow->is_empty()) { - m_flows_in_cache++; - flow->create(pkt, hashval); - ret = plugins_post_create(flow->m_flow, pkt); - - if (ret & ProcessPlugin::FlowAction::FLUSH) { - export_flow(flow_index); -#ifdef FLOW_CACHE_STATS - m_flushed++; -#endif /* FLOW_CACHE_STATS */ - } - } else { - /* Check if flow record is expired (inactive timeout). */ - if (pkt.ts.tv_sec - flow->m_flow.time_last.tv_sec >= m_inactive) { - m_flow_table[flow_index]->m_flow.end_reason = get_export_reason(flow->m_flow); - plugins_pre_export(flow->m_flow); - export_flow(flow_index); - #ifdef FLOW_CACHE_STATS - m_expired++; - #endif /* FLOW_CACHE_STATS */ - return put_pkt(pkt); + if (m_flow_table[flow_index.value()]->is_empty()) { + m_cache_stats.flows_in_cache++; + m_flow_table[flow_index.value()]->create(pkt, hash_value.value()); + if (plugins_post_create(m_flow_table[flow_index.value()]->m_flow, pkt) & ProcessPlugin::FlowAction::FLUSH) { + export_flow(flow_index.value()); + m_cache_stats.flushed++; } + export_expired(pkt.ts.tv_sec); + return 0; + } + /* Check if flow record is expired (inactive timeout). */ + if (export_on_inactive_timeout(flow_index.value(), pkt.ts.tv_sec)) { + return put_pkt(pkt); + } - /* Check if flow record is expired (active timeout). */ - if (pkt.ts.tv_sec - flow->m_flow.time_first.tv_sec >= m_active) { - m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_ACTIVE; - plugins_pre_export(flow->m_flow); - export_flow(flow_index); -#ifdef FLOW_CACHE_STATS - m_expired++; -#endif /* FLOW_CACHE_STATS */ - return put_pkt(pkt); - } + if (export_on_active_timeout(flow_index.value(), pkt.ts.tv_sec)) { + return put_pkt(pkt); + } - ret = plugins_pre_update(flow->m_flow, pkt); - if (ret & ProcessPlugin::FlowAction::FLUSH) { - flush(pkt, flow_index, ret, source_flow); - return 0; - } else { - flow->update(pkt, source_flow); - ret = plugins_post_update(flow->m_flow, pkt); + const size_t pre_update_return_flags = plugins_pre_update(m_flow_table[flow_index.value()]->m_flow, pkt); + if (pre_update_return_flags & ProcessPlugin::FlowAction::FLUSH) { + flush(pkt, flow_index.value(), pre_update_return_flags, source_to_destination); + return 0; + } + m_flow_table[flow_index.value()]->update(pkt, source_to_destination); + const size_t post_update_return_flags = plugins_post_update(m_flow_table[flow_index.value()]->m_flow, pkt); - if (ret & ProcessPlugin::FlowAction::FLUSH) { - flush(pkt, flow_index, ret, source_flow); - return 0; - } - } + if (post_update_return_flags & ProcessPlugin::FlowAction::FLUSH) { + flush(pkt, flow_index.value(), post_update_return_flags, source_to_destination); + return 0; } export_expired(pkt.ts.tv_sec); @@ -512,9 +377,11 @@ void NHTFlowCache::try_to_fill_ports_to_fragmented_packet(Packet& packet) m_fragmentation_cache.process_packet(packet); } -uint8_t NHTFlowCache::get_export_reason(Flow &flow) +uint8_t NHTFlowCache::get_export_reason(const Flow& flow) { - if ((flow.src_tcp_flags | flow.dst_tcp_flags) & (0x01 | 0x04)) { + constexpr uint8_t TCP_FIN = 0x01; + constexpr uint8_t TCP_RST = 0x04; + if ((flow.src_tcp_flags | flow.dst_tcp_flags) & (TCP_FIN | TCP_RST)) { // When FIN or RST is set, TCP connection ended naturally return FLOW_END_EOF; } else { @@ -524,95 +391,81 @@ uint8_t NHTFlowCache::get_export_reason(Flow &flow) void NHTFlowCache::export_expired(time_t ts) { - for (decltype(m_timeout_idx) i = m_timeout_idx; i < m_timeout_idx + m_line_new_idx; i++) { + for (decltype(m_last_exported_on_timeout_index) i = m_last_exported_on_timeout_index; i < m_last_exported_on_timeout_index + m_new_flow_insert_index; i++) { if (!m_flow_table[i]->is_empty() && ts - m_flow_table[i]->m_flow.time_last.tv_sec >= m_inactive) { m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow); plugins_pre_export(m_flow_table[i]->m_flow); export_flow(i); - if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) { - m_flow_table[i]->m_flow.is_delayed = false; - plugins_pre_export(m_flow_table[i]->m_flow); - export_flow(i); - } - if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) { - m_flow_table[i]->m_delayed_flow_waiting = false; - plugins_pre_export(m_flow_table[i]->m_delayed_flow); - export_flow(i); - } -#ifdef FLOW_CACHE_STATS - m_expired++; -#endif /* FLOW_CACHE_STATS */ + /*if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) { + m_flow_table[i]->m_flow.is_delayed = false; + plugins_pre_export(m_flow_table[i]->m_flow); + export_flow(i); + } + if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) { + m_flow_table[i]->m_delayed_flow_waiting = false; + plugins_pre_export(m_flow_table[i]->m_delayed_flow); + export_flow(i); + }*/ + m_cache_stats.expired++; } } - m_timeout_idx = (m_timeout_idx + m_line_new_idx) & (m_cache_size - 1); + m_last_exported_on_timeout_index = (m_last_exported_on_timeout_index + m_new_flow_insert_index) & (m_cache_size - 1); } -bool NHTFlowCache::create_hash_key(Packet &pkt) +template +static std::array pointerToByteArray(const Type* pointer) noexcept { - if (pkt.ip_version == IP::v4) { - struct flow_key_v4_t *key_v4 = reinterpret_cast(m_key); - struct flow_key_v4_t *key_v4_inv = reinterpret_cast(m_key_inv); - - key_v4->proto = pkt.ip_proto; - key_v4->ip_version = IP::v4; - key_v4->src_port = pkt.src_port; - key_v4->dst_port = pkt.dst_port; - key_v4->src_ip = pkt.src_ip.v4; - key_v4->dst_ip = pkt.dst_ip.v4; - key_v4->vlan_id = pkt.vlan_id; + std::array res; + std::copy_n(reinterpret_cast(pointer), ArraySize, res.begin()); + return res; +} - key_v4_inv->proto = pkt.ip_proto; - key_v4_inv->ip_version = IP::v4; - key_v4_inv->src_port = pkt.dst_port; - key_v4_inv->dst_port = pkt.src_port; - key_v4_inv->src_ip = pkt.dst_ip.v4; - key_v4_inv->dst_ip = pkt.src_ip.v4; - key_v4_inv->vlan_id = pkt.vlan_id; +template +static const uint8_t* scalarToArrayEnd(const ScalarType& scalar) noexcept +{ + return scalarToArrayBegin(scalar) + sizeof(scalar); +} - m_keylen = sizeof(flow_key_v4_t); +bool NHTFlowCache::create_hash_key(const Packet& packet) +{ + if (packet.ip_version == IP::v4) { + m_key = FlowKeyv4{ packet.src_port, packet.dst_port, packet.ip_proto, IP::v4, + pointerToByteArray(&packet.src_ip.v4), + pointerToByteArray(&packet.dst_ip.v4), + static_cast(packet.vlan_id)}; + m_key_reversed = FlowKeyv4{ packet.dst_port, packet.src_port, packet.ip_proto, IP::v4, + pointerToByteArray(&packet.dst_ip.v4), + pointerToByteArray(&packet.src_ip.v4), + static_cast(packet.vlan_id)}; return true; - } else if (pkt.ip_version == IP::v6) { - struct flow_key_v6_t *key_v6 = reinterpret_cast(m_key); - struct flow_key_v6_t *key_v6_inv = reinterpret_cast(m_key_inv); - - key_v6->proto = pkt.ip_proto; - key_v6->ip_version = IP::v6; - key_v6->src_port = pkt.src_port; - key_v6->dst_port = pkt.dst_port; - memcpy(key_v6->src_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); - memcpy(key_v6->dst_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); - key_v6->vlan_id = pkt.vlan_id; - - key_v6_inv->proto = pkt.ip_proto; - key_v6_inv->ip_version = IP::v6; - key_v6_inv->src_port = pkt.dst_port; - key_v6_inv->dst_port = pkt.src_port; - memcpy(key_v6_inv->src_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); - memcpy(key_v6_inv->dst_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); - key_v6_inv->vlan_id = pkt.vlan_id; - - m_keylen = sizeof(flow_key_v6_t); + } + if (packet.ip_version == IP::v6) { + m_key = FlowKeyv6{ packet.src_port, packet.dst_port, packet.ip_proto, IP::v6, + pointerToByteArray(packet.src_ip.v6), + pointerToByteArray(packet.dst_ip.v6), + static_cast(packet.vlan_id)}; + m_key_reversed = FlowKeyv6{ packet.dst_port, packet.src_port, packet.ip_proto, IP::v6, + pointerToByteArray(packet.dst_ip.v6), + pointerToByteArray(packet.src_ip.v6), + static_cast(packet.vlan_id)}; return true; } - return false; } -#ifdef FLOW_CACHE_STATS void NHTFlowCache::print_report() { - float tmp = float(m_lookups) / m_hits; + /*1float tmp = float(m_cache_stats.lookups) / m_cache_stats.hits; - cout << "Hits: " << m_hits << endl; - cout << "Empty: " << m_empty << endl; - cout << "Not empty: " << m_not_empty << endl; - cout << "Expired: " << m_expired << endl; - cout << "Flushed: " << m_flushed << endl; + cout << "Hits: " << m_cache_stats.hits << endl; + cout << "Empty: " << m_cache_stats.empty << endl; + cout << "Not empty: " << m_cache_stats.not_empty << endl; + cout << "Expired: " << m_cache_stats.expired << endl; + cout << "Flushed: " << m_cache_stats.flushed << endl; cout << "Average Lookup: " << tmp << endl; - cout << "Variance Lookup: " << float(m_lookups2) / m_hits - tmp * tmp << endl; + cout << "Variance Lookup: " << float(m_cache_stats.lookups2) / m_cache_stats.hits - tmp * tmp << endl;*/ } -#endif /* FLOW_CACHE_STATS */ void NHTFlowCache::set_telemetry_dir(std::shared_ptr dir) { @@ -674,8 +527,8 @@ telemetry::Content NHTFlowCache::get_cache_telemetry() dict["FlowEndReason:Collision"] = m_flow_end_reason_stats.collision; dict["FlowEndReason:Forced"] = m_flow_end_reason_stats.forced; - dict["FlowsInCache"] = m_flows_in_cache; - dict["FlowCacheUsage"] = telemetry::ScalarWithUnit {double(m_flows_in_cache) / m_cache_size * 100, "%"}; + dict["FlowsInCache"] = m_cache_stats.flows_in_cache; + dict["FlowCacheUsage"] = telemetry::ScalarWithUnit {double(m_cache_stats.flows_in_cache) / m_cache_size * 100, "%"}; dict["FlowRecordStats:1packet"] = m_flow_record_stats.packets_count_1; dict["FlowRecordStats:2-5packets"] = m_flow_record_stats.packets_count_2_5; @@ -684,72 +537,16 @@ telemetry::Content NHTFlowCache::get_cache_telemetry() dict["FlowRecordStats:21-50packets"] = m_flow_record_stats.packets_count_21_50; dict["FlowRecordStats:51-plusPackets"] = m_flow_record_stats.packets_count_51_plus; - dict["TotalExportedFlows"] = m_total_exported; + dict["TotalExportedFlows"] = m_cache_stats.total_exported; return dict; } void NHTFlowCache::prefetch_export_expired() const { - for (decltype(m_timeout_idx) i = m_timeout_idx; i < m_timeout_idx + m_line_new_idx; i++) { + for (decltype(m_last_exported_on_timeout_index) i = m_last_exported_on_timeout_index; i < m_last_exported_on_timeout_index + m_new_flow_insert_index; i++) { __builtin_prefetch(m_flow_table[i], 0, 1); } } -#ifdef WITH_CTT - -void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts) -{ - try { - std::vector key = assemble_key(flow_hash_ctt); - std::vector state = assemble_state( - OffloadMode::PACKET_OFFLOAD, - MetaType::FULL, - ts); - m_commander->write_record(std::move(key), std::move(state)); - } - catch (const std::exception& e) { - throw; - } -} - -void CttController::export_record(uint64_t flow_hash_ctt) -{ - try { - std::vector key = assemble_key(flow_hash_ctt); - m_commander->export_and_delete_record(std::move(key)); - } - catch (const std::exception& e) { - throw; - } -} - -std::vector CttController::assemble_key(uint64_t flow_hash_ctt) -{ - std::vector key(key_size_bytes, std::byte(0)); - for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) { - key[i] = static_cast((flow_hash_ctt >> (8 * i)) & 0xFF); - } - return key; -} - -std::vector CttController::assemble_state( - OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts) -{ - std::vector state(state_size_bytes, std::byte(0)); - std::vector state_mask(state_mask_size_bytes, std::byte(0)); - - state[0] = static_cast(offload_mode); - state[1] = static_cast(meta_type); - - // timestamp in sec/ns format, 32+32 bits - 64 bits in total - for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) { - state[2 + i] = static_cast((ts.tv_sec >> (8 * i)) & 0xFF); - } - for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) { - state[6 + i] = static_cast((ts.tv_usec >> (8 * i)) & 0xFF); - } - return state; -} -#endif // WITH_CTT } \ No newline at end of file diff --git a/storage/cache.hpp b/storage/cache.hpp index ad8de024..f3692330 100644 --- a/storage/cache.hpp +++ b/storage/cache.hpp @@ -38,109 +38,24 @@ #include #include -#include +//#include +#include #include -#include #include #include "fragmentationCache/fragmentationCache.hpp" -#ifdef WITH_CTT -#include -#include -#include -#include -#include -#include -#include -#include -#endif /* WITH_CTT */ - -namespace ipxp { - -#ifdef WITH_CTT - -class CttController { -public: - enum class OffloadMode : uint8_t { - NO_OFFLOAD = 0x0, - PACKET_OFFLOAD = 0x1, - META_EXPORT = 0x2, - PACKET_OFFLOAD_WITH_EXPORT = 0x3 - }; - enum class MetaType : uint8_t { - FULL = 0x0, - HALF = 0x1, - TS_ONLY = 0x2, - NO_META = 0x3 - }; - /** - * @brief init the CTT. - * - * @param nfb_dev The NFB device file (e.g., "/dev/nfb0"). - * @param ctt_comp_index The index of the CTT component. - */ - void init(const std::string& nfb_dev, unsigned ctt_comp_index) { - m_commander = std::make_unique(ctt::NfbParams{nfb_dev, ctt_comp_index}); - try { - // Get UserInfo to determine key, state, and state_mask sizes - ctt::UserInfo user_info = m_commander->get_user_info(); - key_size_bytes = (user_info.key_bit_width + 7) / 8; - state_size_bytes = (user_info.state_bit_width + 7) / 8; - state_mask_size_bytes = (user_info.state_mask_bit_width + 7) / 8; +#include "cacheOptParser.hpp" +#include "flowKey.tpp" +#include "flowRecord.hpp" +#include "cttController.hpp" - // Enable the CTT - std::future enable_future = m_commander->enable(true); - enable_future.wait(); - } - catch (const std::exception& e) { - throw; - } - } - /** - * @brief Command: mark a flow for offload. - * - * @param flow_hash_ctt The flow hash to be offloaded. - */ - void create_record(uint64_t flow_hash_ctt, const struct timeval& timestamp_first); +namespace ipxp { - /** - * @brief Command: export a flow from the CTT. - * - * @param flow_hash_ctt The flow hash to be exported. - */ - void export_record(uint64_t flow_hash_ctt); -private: - std::unique_ptr m_commander; - size_t key_size_bytes; - size_t state_size_bytes; - size_t state_mask_size_bytes; - /** - * @brief Assembles the state vector from the given values. - * - * @param offload_mode The offload mode. - * @param meta_type The metadata type. - * @param timestamp_first The first timestamp of the flow. - * @return A byte vector representing the assembled state vector. - */ - std::vector assemble_state( - OffloadMode offload_mode, MetaType meta_type, - const struct timeval& timestamp_first); - - /** - * @brief Assembles the key vector from the given flow hash. - * - * @param flow_hash_ctt The flow hash. - * @return A byte vector representing the assembled key vector. - */ - std::vector assemble_key(uint64_t flow_hash_ctt); -}; -#endif /* WITH_CTT */ - -struct __attribute__((packed)) flow_key_v4_t { +/*struct __attribute__((packed)) flow_key_v4_t { uint16_t src_port; uint16_t dst_port; uint8_t proto; @@ -158,138 +73,10 @@ struct __attribute__((packed)) flow_key_v6_t { uint8_t src_ip[16]; uint8_t dst_ip[16]; uint16_t vlan_id; -}; - -#define MAX_KEY_LENGTH (max(sizeof(flow_key_v4_t), sizeof(flow_key_v6_t))) - -#ifdef IPXP_FLOW_CACHE_SIZE -static const uint32_t DEFAULT_FLOW_CACHE_SIZE = IPXP_FLOW_CACHE_SIZE; -#else -static const uint32_t DEFAULT_FLOW_CACHE_SIZE = 17; // 131072 records total -#endif /* IPXP_FLOW_CACHE_SIZE */ - -#ifdef IPXP_FLOW_LINE_SIZE -static const uint32_t DEFAULT_FLOW_LINE_SIZE = IPXP_FLOW_LINE_SIZE; -#else -static const uint32_t DEFAULT_FLOW_LINE_SIZE = 4; // 16 records per line -#endif /* IPXP_FLOW_LINE_SIZE */ - -static const uint32_t DEFAULT_INACTIVE_TIMEOUT = 30; -static const uint32_t DEFAULT_ACTIVE_TIMEOUT = 300; - -static_assert(std::is_unsigned(), "Static checks of default cache sizes won't properly work without unsigned type."); -static_assert(bitcount(-1) > DEFAULT_FLOW_CACHE_SIZE, "Flow cache size is too big to fit in variable!"); -static_assert(bitcount(-1) > DEFAULT_FLOW_LINE_SIZE, "Flow cache line size is too big to fit in variable!"); - -static_assert(DEFAULT_FLOW_LINE_SIZE >= 1, "Flow cache line size must be at least 1!"); -static_assert(DEFAULT_FLOW_CACHE_SIZE >= DEFAULT_FLOW_LINE_SIZE, "Flow cache size must be at least cache line size!"); - -class CacheOptParser : public OptionsParser -{ -public: - uint32_t m_cache_size; - uint32_t m_line_size; - uint32_t m_active; - uint32_t m_inactive; - bool m_split_biflow; - bool m_enable_fragmentation_cache; - std::size_t m_frag_cache_size; - time_t m_frag_cache_timeout; - #ifdef WITH_CTT - std::string m_dev; - #endif /* WITH_CTT */ - - CacheOptParser() : OptionsParser("cache", "Storage plugin implemented as a hash table"), - m_cache_size(1 << DEFAULT_FLOW_CACHE_SIZE), m_line_size(1 << DEFAULT_FLOW_LINE_SIZE), - m_active(DEFAULT_ACTIVE_TIMEOUT), m_inactive(DEFAULT_INACTIVE_TIMEOUT), m_split_biflow(false), - m_enable_fragmentation_cache(true), m_frag_cache_size(10007), // Prime for better distribution in hash table - m_frag_cache_timeout(3) - { - register_option("s", "size", "EXPONENT", "Cache size exponent to the power of two", - [this](const char *arg){try {unsigned exp = str2num(arg); - if (exp < 4 || exp > 30) { - throw PluginError("Flow cache size must be between 4 and 30"); - } - m_cache_size = static_cast(1) << exp; - } catch(std::invalid_argument &e) {return false;} return true;}, - OptionFlags::RequiredArgument); - register_option("l", "line", "EXPONENT", "Cache line size exponent to the power of two", - [this](const char *arg){try {m_line_size = static_cast(1) << str2num(arg); - if (m_line_size < 1) { - throw PluginError("Flow cache line size must be at least 1"); - } - } catch(std::invalid_argument &e) {return false;} return true;}, - OptionFlags::RequiredArgument); - register_option("a", "active", "TIME", "Active timeout in seconds", - [this](const char *arg){try {m_active = str2num(arg);} catch(std::invalid_argument &e) {return false;} return true;}, - OptionFlags::RequiredArgument); - register_option("i", "inactive", "TIME", "Inactive timeout in seconds", - [this](const char *arg){try {m_inactive = str2num(arg);} catch(std::invalid_argument &e) {return false;} return true;}, - OptionFlags::RequiredArgument); - register_option("S", "split", "", "Split biflows into uniflows", - [this](const char *arg){ m_split_biflow = true; return true;}, OptionFlags::NoArgument); - register_option("fe", "frag-enable", "true|false", "Enable/disable fragmentation cache. Enabled (true) by default.", - [this](const char *arg){ - if (strcmp(arg, "true") == 0) { - m_enable_fragmentation_cache = true; - } else if (strcmp(arg, "false") == 0) { - m_enable_fragmentation_cache = false; - } else { - return false; - } - return true; - }, OptionFlags::RequiredArgument); - register_option("fs", "frag-size", "size", "Size of fragmentation cache, must be at least 1. Default value is 10007.", [this](const char *arg) { - try { - m_frag_cache_size = str2num(arg); - } catch(std::invalid_argument &e) { - return false; - } - return m_frag_cache_size > 0; - }); - register_option("ft", "frag-timeout", "TIME", "Timeout of fragments in fragmentation cache in seconds. Default value is 3.", [this](const char *arg) { - try { - m_frag_cache_timeout = str2num(arg); - } catch(std::invalid_argument &e) { - return false; - } - return true; - }); - - #ifdef WITH_CTT - register_option("d", "dev", "DEV", "Device name", - [this](const char *arg) { - m_dev = arg; - return true; - }, - OptionFlags::RequiredArgument); - #endif /* WITH_CTT */ - - } -}; - -class alignas(64) FlowRecord -{ - uint64_t m_hash; - -public: - Flow m_flow; - #ifdef WITH_CTT - Flow m_delayed_flow; - bool m_delayed_flow_waiting; - #endif /* WITH_CTT */ - - FlowRecord(); - ~FlowRecord(); +};*/ - void erase(); - void reuse(); +//#define MAX_KEY_LENGTH (std::max(sizeof(flow_key_v4_t), sizeof(flow_key_v6_t))) - inline bool is_empty() const; - inline bool belongs(uint64_t pkt_hash) const; - void create(const Packet &pkt, uint64_t pkt_hash); - void update(const Packet &pkt, bool src); -}; struct FlowEndReasonStats { uint64_t active_timeout; @@ -308,117 +95,85 @@ struct FlowRecordStats { uint64_t packets_count_51_plus; }; +struct FlowCacheStats{ + uint64_t empty; + uint64_t not_empty; + uint64_t hits; + uint64_t expired; + uint64_t flushed; + uint64_t lookups; + uint64_t lookups2; + uint64_t flows_in_cache; + uint64_t total_exported; +}; + class NHTFlowCache : TelemetryUtils, public StoragePlugin { public: NHTFlowCache(); - ~NHTFlowCache(); - void init(const char *params); - void close(); - void set_queue(ipx_ring_t *queue); - OptionsParser *get_parser() const { return new CacheOptParser(); } - std::string get_name() const { return "cache"; } + ~NHTFlowCache() override; + void init(const char* params) override; + void close() override; + void set_queue(ipx_ring_t* queue) override; + OptionsParser * get_parser() const override; + std::string get_name() const noexcept override; - int put_pkt(Packet &pkt); - void export_expired(time_t ts); + int put_pkt(Packet& pkt) override; + void export_expired(time_t ts) override; /** * @brief Set and configure the telemetry directory where cache stats will be stored. */ void set_telemetry_dir(std::shared_ptr dir) override; - #ifdef WITH_CTT - - int plugins_post_create(Flow &rec, Packet &pkt) { - int ret = StoragePlugin::plugins_post_create(rec, pkt); - rec.record_in_ctt = false; - //if (only_metadata_required(rec)) { - if (only_metadata_required(rec)) { - m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); - rec.record_in_ctt = true; - } - return ret; - } - - // override post_update method - int plugins_post_update(Flow &rec, Packet &pkt) { - int ret = StoragePlugin::plugins_post_update(rec, pkt); - //if (only_metadata_required(rec) && !rec.ctt_state) { - if (!rec.record_in_ctt) { // only for debug!!!!! line above is correct for production - m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); - rec.record_in_ctt = true; - } - return ret; - } - - // override pre_export method - void plugins_pre_export(Flow &rec) { - if (rec.record_in_ctt) { - rec.is_delayed = true; - rec.delay_time = time(nullptr) + 1; - m_ctt_controller.export_record(rec.flow_hash_ctt); - rec.record_in_ctt = false; - return; - } - if (rec.is_delayed) { - return; - } else { - StoragePlugin::plugins_pre_export(rec); - } - } - - #endif /* WITH_CTT */ - private: uint32_t m_cache_size; uint32_t m_line_size; uint32_t m_line_mask; - uint32_t m_line_new_idx; - uint32_t m_qsize; - uint32_t m_qidx; - uint32_t m_timeout_idx; - uint64_t m_flows_in_cache = 0; - uint64_t m_total_exported = 0; -#ifdef FLOW_CACHE_STATS - uint64_t m_empty; - uint64_t m_not_empty; - uint64_t m_hits; - uint64_t m_expired; - uint64_t m_flushed; - uint64_t m_lookups; - uint64_t m_lookups2; -#endif /* FLOW_CACHE_STATS */ + uint32_t m_new_flow_insert_index; + uint32_t m_queue_size; + uint32_t m_queue_index{0}; + uint32_t m_last_exported_on_timeout_index{0}; + uint32_t m_active; uint32_t m_inactive; bool m_split_biflow; bool m_enable_fragmentation_cache; - uint8_t m_keylen; - char m_key[MAX_KEY_LENGTH]; - char m_key_inv[MAX_KEY_LENGTH]; - FlowRecord **m_flow_table; - FlowRecord *m_flow_records; -#ifdef WITH_CTT - CttController m_ctt_controller; -#endif /* WITH_CTT */ + //uint8_t m_keylen; + //std::array m_key_hash_buffer; + std::variant m_key; + std::variant m_key_reversed; + //std::array m_key_reversed_hash_buffer; + std::vector m_flow_table; + std::vector m_flows; + FragmentationCache m_fragmentation_cache; FlowEndReasonStats m_flow_end_reason_stats = {}; FlowRecordStats m_flow_record_stats = {}; + FlowCacheStats m_cache_stats = {}; +#ifdef WITH_CTT + CttController m_ctt_controller; +#endif /* WITH_CTT */ + void try_to_fill_ports_to_fragmented_packet(Packet& packet); void flush(Packet &pkt, size_t flow_index, int ret, bool source_flow); - bool create_hash_key(Packet &pkt); - void export_flow(size_t index); - static uint8_t get_export_reason(Flow &flow); + bool create_hash_key(const Packet &packet); + static uint8_t get_export_reason(const Flow &flow); void finish(); - + void allocate_table(); void update_flow_end_reason_stats(uint8_t reason); void update_flow_record_stats(uint64_t packets_count); telemetry::Content get_cache_telemetry(); void prefetch_export_expired() const; - -#ifdef FLOW_CACHE_STATS + void get_parser_options(CacheOptParser& parser) noexcept; + void push_to_export_queue(size_t flow_index) noexcept; + std::tuple, std::optional, bool> find_flow_index(const Packet& packet) noexcept; + bool export_on_inactive_timeout(size_t flow_index, time_t ts) noexcept; + bool export_on_active_timeout(size_t flow_index, time_t ts) noexcept; + void export_flow(size_t flow_index, int reason); + void export_flow(size_t flow_index); void print_report(); -#endif /* FLOW_CACHE_STATS */ }; } diff --git a/storage/cacheOptParser.cpp b/storage/cacheOptParser.cpp new file mode 100644 index 00000000..2899243a --- /dev/null +++ b/storage/cacheOptParser.cpp @@ -0,0 +1,101 @@ +#include "cacheOptParser.hpp" +#include +#include +#include + +namespace ipxp { + + + +#ifdef IPXP_FLOW_CACHE_SIZE +static const uint32_t DEFAULT_FLOW_CACHE_SIZE = IPXP_FLOW_CACHE_SIZE; +#else +static const uint32_t DEFAULT_FLOW_CACHE_SIZE = 17; // 131072 records total +#endif /* IPXP_FLOW_CACHE_SIZE */ + +#ifdef IPXP_FLOW_LINE_SIZE +static const uint32_t DEFAULT_FLOW_LINE_SIZE = IPXP_FLOW_LINE_SIZE; +#else +static const uint32_t DEFAULT_FLOW_LINE_SIZE = 4; // 16 records per line +#endif /* IPXP_FLOW_LINE_SIZE */ + +static const uint32_t DEFAULT_INACTIVE_TIMEOUT = 30; +static const uint32_t DEFAULT_ACTIVE_TIMEOUT = 300; + +static_assert(std::is_unsigned(), "Static checks of default cache sizes won't properly work without unsigned type."); +static_assert(bitcount(-1) > DEFAULT_FLOW_CACHE_SIZE, "Flow cache size is too big to fit in variable!"); +static_assert(bitcount(-1) > DEFAULT_FLOW_LINE_SIZE, "Flow cache line size is too big to fit in variable!"); + +static_assert(DEFAULT_FLOW_LINE_SIZE >= 1, "Flow cache line size must be at least 1!"); +static_assert(DEFAULT_FLOW_CACHE_SIZE >= DEFAULT_FLOW_LINE_SIZE, "Flow cache size must be at least cache line size!"); + +CacheOptParser::CacheOptParser() : OptionsParser("cache", "Storage plugin implemented as a hash table"), + m_cache_size(1 << DEFAULT_FLOW_CACHE_SIZE), m_line_size(1 << DEFAULT_FLOW_LINE_SIZE), + m_active(DEFAULT_ACTIVE_TIMEOUT), m_inactive(DEFAULT_INACTIVE_TIMEOUT), m_split_biflow(false), + m_enable_fragmentation_cache(true), m_frag_cache_size(10007), // Prime for better distribution in hash table + m_frag_cache_timeout(3) + { + register_option("s", "size", "EXPONENT", "Cache size exponent to the power of two", + [this](const char *arg){try {unsigned exp = str2num(arg); + if (exp < 4 || exp > 30) { + throw PluginError("Flow cache size must be between 4 and 30"); + } + m_cache_size = static_cast(1) << exp; + } catch(std::invalid_argument &e) {return false;} return true;}, + OptionFlags::RequiredArgument); + register_option("l", "line", "EXPONENT", "Cache line size exponent to the power of two", + [this](const char *arg){try {m_line_size = static_cast(1) << str2num(arg); + if (m_line_size < 1) { + throw PluginError("Flow cache line size must be at least 1"); + } + } catch(std::invalid_argument &e) {return false;} return true;}, + OptionFlags::RequiredArgument); + register_option("a", "active", "TIME", "Active timeout in seconds", + [this](const char *arg){try {m_active = str2num(arg);} catch(std::invalid_argument &e) {return false;} return true;}, + OptionFlags::RequiredArgument); + register_option("i", "inactive", "TIME", "Inactive timeout in seconds", + [this](const char *arg){try {m_inactive = str2num(arg);} catch(std::invalid_argument &e) {return false;} return true;}, + OptionFlags::RequiredArgument); + register_option("S", "split", "", "Split biflows into uniflows", + [this](const char *arg){ m_split_biflow = true; return true;}, OptionFlags::NoArgument); + register_option("fe", "frag-enable", "true|false", "Enable/disable fragmentation cache. Enabled (true) by default.", + [this](const char *arg){ + if (strcmp(arg, "true") == 0) { + m_enable_fragmentation_cache = true; + } else if (strcmp(arg, "false") == 0) { + m_enable_fragmentation_cache = false; + } else { + return false; + } + return true; + }, OptionFlags::RequiredArgument); + register_option("fs", "frag-size", "size", "Size of fragmentation cache, must be at least 1. Default value is 10007.", [this](const char *arg) { + try { + m_frag_cache_size = str2num(arg); + } catch(std::invalid_argument &e) { + return false; + } + return m_frag_cache_size > 0; + }); + register_option("ft", "frag-timeout", "TIME", "Timeout of fragments in fragmentation cache in seconds. Default value is 3.", [this](const char *arg) { + try { + m_frag_cache_timeout = str2num(arg); + } catch(std::invalid_argument &e) { + return false; + } + return true; + }); + + #ifdef WITH_CTT + register_option("d", "dev", "DEV", "Device name", + [this](const char *arg) { + m_dev = arg; + return true; + }, + OptionFlags::RequiredArgument); + #endif /* WITH_CTT */ + + } + + +} // ipxp \ No newline at end of file diff --git a/storage/cacheOptParser.hpp b/storage/cacheOptParser.hpp new file mode 100644 index 00000000..2731d8b4 --- /dev/null +++ b/storage/cacheOptParser.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + + + + + + +namespace ipxp { + +class CacheOptParser : public OptionsParser +{ +public: + uint32_t m_cache_size; + uint32_t m_line_size; + uint32_t m_active; + uint32_t m_inactive; + bool m_split_biflow; + bool m_enable_fragmentation_cache; + std::size_t m_frag_cache_size; + time_t m_frag_cache_timeout; + #ifdef WITH_CTT + std::string m_dev; + #endif /* WITH_CTT */ + + CacheOptParser(); +}; + + +} // ipxp diff --git a/storage/cacheRowSpan.cpp b/storage/cacheRowSpan.cpp new file mode 100644 index 00000000..9e0f6320 --- /dev/null +++ b/storage/cacheRowSpan.cpp @@ -0,0 +1,42 @@ +#include +#include "cacheRowSpan.hpp" + +namespace ipxp { + +CacheRowSpan::CacheRowSpan(FlowRecord** begin, size_t count) noexcept + : m_begin(begin), m_count(count) +{ +} + +std::optional CacheRowSpan::find_by_hash(uint64_t hash) const noexcept +{ + for (size_t i = 0; i < m_count; ++i) { + if (m_begin[i]->belongs(hash)) { + return i; + } + } + return std::nullopt; +} + +void CacheRowSpan::advance_flow_to(size_t from, size_t to) noexcept +{ + std::rotate(m_begin + to, m_begin + from, m_begin + from + 1); +} + +void CacheRowSpan::advance_flow(size_t flow_index) noexcept +{ + advance_flow_to(flow_index, 0); +} + +std::optional CacheRowSpan::find_empty() const noexcept +{ + auto it = std::find_if(m_begin, m_begin + m_count, [](const FlowRecord* flow) { + return flow->is_empty(); + }); + if (it == m_begin + m_count) { + return std::nullopt; + } + return it - m_begin; +} + +} // ipxp \ No newline at end of file diff --git a/storage/cacheRowSpan.hpp b/storage/cacheRowSpan.hpp new file mode 100644 index 00000000..6b8ca4c9 --- /dev/null +++ b/storage/cacheRowSpan.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include +#include +#include "flowRecord.hpp" + +namespace ipxp { + +class CacheRowSpan { +public: + CacheRowSpan(FlowRecord** begin, size_t count) noexcept; + std::optional find_by_hash(uint64_t hash) const noexcept; + void advance_flow(size_t flow_index) noexcept; + void advance_flow_to(size_t from, size_t to) noexcept; + std::optional find_empty() const noexcept; +private: + FlowRecord** m_begin; + size_t m_count; +}; + +} // ipxp diff --git a/storage/cttController.cpp b/storage/cttController.cpp new file mode 100644 index 00000000..708fdb5f --- /dev/null +++ b/storage/cttController.cpp @@ -0,0 +1,85 @@ +// +// Created by zaida on 21.12.2024. +// + +#include "cttController.hpp" + +#ifdef WITH_CTT + +namespace ipxp { + +void CttController::init(const std::string& nfb_dev, unsigned ctt_comp_index) { + m_commander = std::make_unique(ctt::NfbParams{nfb_dev, ctt_comp_index}); + try { + // Get UserInfo to determine key, state, and state_mask sizes + ctt::UserInfo user_info = m_commander->get_user_info(); + key_size_bytes = (user_info.key_bit_width + 7) / 8; + state_size_bytes = (user_info.state_bit_width + 7) / 8; + state_mask_size_bytes = (user_info.state_mask_bit_width + 7) / 8; + + // Enable the CTT + std::future enable_future = m_commander->enable(true); + enable_future.wait(); + } + catch (const std::exception& e) { + throw; + } +} + +void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts) +{ + try { + std::vector key = assemble_key(flow_hash_ctt); + std::vector state = assemble_state( + OffloadMode::PACKET_OFFLOAD, + MetaType::FULL, + ts); + m_commander->write_record(std::move(key), std::move(state)); + } + catch (const std::exception& e) { + throw; + } +} + +void CttController::export_record(uint64_t flow_hash_ctt) +{ + try { + std::vector key = assemble_key(flow_hash_ctt); + m_commander->export_and_delete_record(std::move(key)); + } + catch (const std::exception& e) { + throw; + } +} + +std::vector CttController::assemble_key(uint64_t flow_hash_ctt) +{ + std::vector key(key_size_bytes, std::byte(0)); + for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) { + key[i] = static_cast((flow_hash_ctt >> (8 * i)) & 0xFF); + } + return key; +} + +std::vector CttController::assemble_state( + OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts) +{ + std::vector state(state_size_bytes, std::byte(0)); + std::vector state_mask(state_mask_size_bytes, std::byte(0)); + + state[0] = static_cast(offload_mode); + state[1] = static_cast(meta_type); + + // timestamp in sec/ns format, 32+32 bits - 64 bits in total + for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) { + state[2 + i] = static_cast((ts.tv_sec >> (8 * i)) & 0xFF); + } + for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) { + state[6 + i] = static_cast((ts.tv_usec >> (8 * i)) & 0xFF); + } + return state; +} + +} // ipxp + +#endif /* WITH_CTT */ diff --git a/storage/cttController.hpp b/storage/cttController.hpp new file mode 100644 index 00000000..4104ad65 --- /dev/null +++ b/storage/cttController.hpp @@ -0,0 +1,82 @@ +#pragma once + +//#define WITH_CTT 1 // TODO REMOVE + +#ifdef WITH_CTT +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ipxp { + +class CttController { +public: + enum class OffloadMode : uint8_t { + NO_OFFLOAD = 0x0, + PACKET_OFFLOAD = 0x1, + META_EXPORT = 0x2, + PACKET_OFFLOAD_WITH_EXPORT = 0x3 + }; + enum class MetaType : uint8_t { + FULL = 0x0, + HALF = 0x1, + TS_ONLY = 0x2, + NO_META = 0x3 + }; + /** + * @brief init the CTT. + * + * @param nfb_dev The NFB device file (e.g., "/dev/nfb0"). + * @param ctt_comp_index The index of the CTT component. + */ + void init(const std::string& nfb_dev, unsigned ctt_comp_index); + + /** + * @brief Command: mark a flow for offload. + * + * @param flow_hash_ctt The flow hash to be offloaded. + */ + void create_record(uint64_t flow_hash_ctt, const struct timeval& timestamp_first); + + /** + * @brief Command: export a flow from the CTT. + * + * @param flow_hash_ctt The flow hash to be exported. + */ + void export_record(uint64_t flow_hash_ctt); + +private: + std::unique_ptr m_commander; + size_t key_size_bytes; + size_t state_size_bytes; + size_t state_mask_size_bytes; + + /** + * @brief Assembles the state vector from the given values. + * + * @param offload_mode The offload mode. + * @param meta_type The metadata type. + * @param timestamp_first The first timestamp of the flow. + * @return A byte vector representing the assembled state vector. + */ + std::vector assemble_state( + OffloadMode offload_mode, MetaType meta_type, + const struct timeval& timestamp_first); + + /** + * @brief Assembles the key vector from the given flow hash. + * + * @param flow_hash_ctt The flow hash. + * @return A byte vector representing the assembled key vector. + */ + std::vector assemble_key(uint64_t flow_hash_ctt); +}; + +} // ipxp + +#endif /* WITH_CTT */ diff --git a/storage/flowKey.tpp b/storage/flowKey.tpp new file mode 100644 index 00000000..aefff366 --- /dev/null +++ b/storage/flowKey.tpp @@ -0,0 +1,23 @@ +#pragma once + +#include +#include +#include + +namespace ipxp { + +template +struct FlowKey { + uint16_t src_port; + uint16_t dst_port; + uint8_t proto; + uint8_t ip_version; + std::array src_ip; + std::array dst_ip; + uint16_t vlan_id; +} __attribute__((packed)); + +using FlowKeyv4 = FlowKey<4>; +using FlowKeyv6 = FlowKey<16>; + +} // namespace ipxp \ No newline at end of file diff --git a/storage/flowRecord.cpp b/storage/flowRecord.cpp new file mode 100644 index 00000000..f0d35cb5 --- /dev/null +++ b/storage/flowRecord.cpp @@ -0,0 +1,132 @@ +#include +#include +#include +#include "flowRecord.hpp" + +namespace ipxp { + +FlowRecord::FlowRecord() +{ + erase(); +}; + +FlowRecord::~FlowRecord() +{ + erase(); +}; + +void FlowRecord::erase() +{ + m_flow.remove_extensions(); + m_hash = 0; + memset(&m_flow.time_first, 0, sizeof(m_flow.time_first)); + memset(&m_flow.time_last, 0, sizeof(m_flow.time_last)); + m_flow.ip_version = 0; + m_flow.ip_proto = 0; + memset(&m_flow.src_ip, 0, sizeof(m_flow.src_ip)); + memset(&m_flow.dst_ip, 0, sizeof(m_flow.dst_ip)); + m_flow.src_port = 0; + m_flow.dst_port = 0; + m_flow.src_packets = 0; + m_flow.dst_packets = 0; + m_flow.src_bytes = 0; + m_flow.dst_bytes = 0; + m_flow.src_tcp_flags = 0; + m_flow.dst_tcp_flags = 0; +} +void FlowRecord::reuse() +{ + m_flow.remove_extensions(); + m_flow.time_first = m_flow.time_last; + m_flow.src_packets = 0; + m_flow.dst_packets = 0; + m_flow.src_bytes = 0; + m_flow.dst_bytes = 0; + m_flow.src_tcp_flags = 0; + m_flow.dst_tcp_flags = 0; +} + +bool FlowRecord::is_empty() const noexcept +{ + return m_hash == 0; +} + + bool FlowRecord::belongs(uint64_t hash) const noexcept +{ + return hash == m_hash; +} + +void FlowRecord::create(const Packet &pkt, uint64_t hash) +{ + m_flow.src_packets = 1; + + m_hash = hash; + + m_flow.time_first = pkt.ts; + m_flow.time_last = pkt.ts; + m_flow.flow_hash = hash; + + memcpy(m_flow.src_mac, pkt.src_mac, 6); + memcpy(m_flow.dst_mac, pkt.dst_mac, 6); + + if (pkt.ip_version == IP::v4) { + m_flow.ip_version = pkt.ip_version; + m_flow.ip_proto = pkt.ip_proto; + m_flow.src_ip.v4 = pkt.src_ip.v4; + m_flow.dst_ip.v4 = pkt.dst_ip.v4; + m_flow.src_bytes = pkt.ip_len; + } else if (pkt.ip_version == IP::v6) { + m_flow.ip_version = pkt.ip_version; + m_flow.ip_proto = pkt.ip_proto; + memcpy(m_flow.src_ip.v6, pkt.src_ip.v6, 16); + memcpy(m_flow.dst_ip.v6, pkt.dst_ip.v6, 16); + m_flow.src_bytes = pkt.ip_len; + } + + if (pkt.ip_proto == IPPROTO_TCP) { + m_flow.src_port = pkt.src_port; + m_flow.dst_port = pkt.dst_port; + m_flow.src_tcp_flags = pkt.tcp_flags; + } else if (pkt.ip_proto == IPPROTO_UDP) { + m_flow.src_port = pkt.src_port; + m_flow.dst_port = pkt.dst_port; + } else if (pkt.ip_proto == IPPROTO_ICMP || + pkt.ip_proto == IPPROTO_ICMPV6) { + m_flow.src_port = pkt.src_port; + m_flow.dst_port = pkt.dst_port; + } + #ifdef WITH_CTT + m_flow.is_delayed = false; + m_delayed_flow_waiting = false; + #endif /* WITH_CTT */ +} + +void FlowRecord::update(const Packet &pkt, bool src) +{ + /*if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow + auto flow_hash = m_hash; + m_delayed_flow = m_flow; + m_delayed_flow_waiting = true; + erase(); // erase the old flow, keeping the delayed flow + create(pkt, flow_hash); + return; + }*/ + m_flow.time_last = pkt.ts; + if (src) { + m_flow.src_packets++; + m_flow.src_bytes += pkt.ip_len; + + if (pkt.ip_proto == IPPROTO_TCP) { + m_flow.src_tcp_flags |= pkt.tcp_flags; + } + } else { + m_flow.dst_packets++; + m_flow.dst_bytes += pkt.ip_len; + + if (pkt.ip_proto == IPPROTO_TCP) { + m_flow.dst_tcp_flags |= pkt.tcp_flags; + } + } +} + +} // ipxp \ No newline at end of file diff --git a/storage/flowRecord.hpp b/storage/flowRecord.hpp new file mode 100644 index 00000000..6f6eae1a --- /dev/null +++ b/storage/flowRecord.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include + +namespace ipxp { + +class alignas(64) FlowRecord +{ + uint64_t m_hash; + + public: + Flow m_flow; +#ifdef WITH_CTT + Flow m_delayed_flow; + bool m_delayed_flow_waiting; +#endif /* WITH_CTT */ + + FlowRecord(); + ~FlowRecord(); + + void erase(); + void reuse(); + + bool is_empty() const noexcept; + bool belongs(uint64_t pkt_hash) const noexcept; + void create(const Packet &pkt, uint64_t pkt_hash); + void update(const Packet &pkt, bool src); +}; + +} // ipxp