Skip to content

Commit

Permalink
OBJECT STREAM
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 11, 2024
1 parent 0b6fe5e commit 17cb0d3
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 33 deletions.
3 changes: 1 addition & 2 deletions nano/lib/object_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class array_stream : private object_stream_base

array_stream (array_stream const &) = delete; // Disallow copying

private:
public:
template <class Value>
void write_single (Value const & value)
{
Expand All @@ -290,7 +290,6 @@ class array_stream : private object_stream_base
ctx.end_array_element ();
}

public:
// Handle `.write (container)`
template <class Container>
void write (Container const & container)
Expand Down
6 changes: 5 additions & 1 deletion nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

lock_a.lock ();

queue.periodic_update ();
bool updated = queue.periodic_update ();
if (updated)
{
node.logger.info (nano::log::type::test, "Fair queue: {}", nano::streamed (queue));
}

timer_l.start ();

Expand Down
60 changes: 30 additions & 30 deletions nano/node/fair_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once

#include <nano/lib/locks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/object_stream.hpp>
#include <nano/lib/object_stream_adapters.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/lib/threading.hpp>
Expand Down Expand Up @@ -50,21 +53,21 @@ class fair_queue final

auto operator<=> (source const &) const = default;

friend std::ostream & operator<< (std::ostream & os, source const & source)
void operator() (nano::object_stream & obs) const
{
os << "sources: ";
std::apply ([&os] (auto const &... sources) {
((os << to_string (sources) << ", "), ...);
},
source.sources);
if (source.channel)
obs.write ("sources", [&] (nano::array_stream & ars) {
std::apply ([&ars] (auto const &... sources) {
((ars.write_single (sources)), ...);
},
sources);
});

if (channel)
{
os << " channel: " << source.channel->to_string ()
<< " (" << source.channel->get_node_id () << ")"
<< " [" << source.channel.get () << "]"
<< " ( use_count: " << source.channel.use_count () << ", alive: " << source.channel->alive () << ")";
obs.write ("channel", channel);
obs.write ("channel_ptr", channel.get ());
obs.write ("channel_ref_count", channel.use_count ());
}
return os;
}
};

Expand Down Expand Up @@ -111,6 +114,13 @@ class fair_queue final
{
return requests.size ();
}

void operator() (nano::object_stream & obs) const
{
obs.write ("priority", priority);
obs.write ("max_size", max_size);
obs.write ("requests", requests.size ());
}
};

public:
Expand Down Expand Up @@ -172,29 +182,11 @@ class fair_queue final
cleanup ();
update ();

std::cout << "fair_queue:\n"
<< *this
<< std::endl;

return true; // Updated
}
return false; // Not updated
}

void dump (std::ostream & os) const
{
for (auto const & [source, queue] : queues)
{
os << "queue: " << source << " - " << queue.size () << " / " << queue.max_size << " (priority: " << queue.priority << ")\n";
}
}

friend std::ostream & operator<< (std::ostream & os, fair_queue const & queue)
{
queue.dump (os);
return os;
}

/**
* Push a request to the appropriate queue based on the source
* Request will be dropped if the queue is full
Expand Down Expand Up @@ -327,5 +319,13 @@ class fair_queue final
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) }));
return composite;
}

void operator() (nano::object_stream & obs) const
{
obs.write_range ("queues", queues, [] (auto const & entry, nano::object_stream & obs) {
obs.write ("source", entry.first);
obs.write ("queue", entry.second);
});
}
};
}
1 change: 1 addition & 0 deletions nano/node/transport/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ void nano::transport::channel::operator() (nano::object_stream & obs) const
obs.write ("endpoint", get_endpoint ());
obs.write ("peering_endpoint", get_peering_endpoint ());
obs.write ("node_id", get_node_id ().to_node_id ());
obs.write ("alive", alive ());
}

0 comments on commit 17cb0d3

Please sign in to comment.