Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/reduce mem pathfinder #3325

Open
wants to merge 38 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
87be301
Pathfinder lazily evaluates and writes parameters instead of taking e…
SteveBronder Dec 12, 2024
3c932f0
add docs
SteveBronder Dec 13, 2024
f1a8e56
remove stringstream from unique stream writer
SteveBronder Dec 13, 2024
48bbb6e
update laplace
SteveBronder Dec 13, 2024
0b46b4a
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Dec 13, 2024
c644df0
remove newline
SteveBronder Dec 13, 2024
814f8df
fixing pathfinder return results
SteveBronder Dec 17, 2024
e484fa1
adds concurrent queue so writes are thread safe to one file
SteveBronder Dec 18, 2024
e88ca19
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Dec 18, 2024
6b24bbf
fix docs for multi_stream_writer
SteveBronder Dec 18, 2024
0e48078
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Dec 18, 2024
298761e
fix docs for multi_stream_writer
SteveBronder Dec 18, 2024
ae3740c
fix docs for multi_stream_writer
SteveBronder Dec 18, 2024
b98f60f
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Dec 18, 2024
a18d7e9
update normal_glm test for better inits
SteveBronder Dec 18, 2024
9250e76
Merge remote-tracking branch 'refs/remotes/origin/feature/reduce-mem-…
SteveBronder Dec 18, 2024
4a058bc
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Dec 18, 2024
09d66e5
update headers
SteveBronder Dec 18, 2024
c25ec46
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Dec 18, 2024
2edfd85
fix stringstream in unique stream vector writer
SteveBronder Jan 6, 2025
6814bd1
fix writes for single pathfinder when doing multi pathfinder
SteveBronder Jan 7, 2025
e1c59da
Merge commit '108daa93b6dd0d7b351864494bdcd5a255f6a765' into HEAD
yashikno Jan 7, 2025
6326a66
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Jan 7, 2025
79e3a9d
update time string for single path in multi path
SteveBronder Jan 8, 2025
a012b88
update stream writer is_nonnull and the writes for multi
SteveBronder Jan 8, 2025
e00d78e
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Jan 8, 2025
a2f023d
update docs and fix multi write with psis turned off so that multiple…
SteveBronder Jan 9, 2025
2047ff4
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Jan 9, 2025
d395878
adds a concurrent writer that spins a seperate thread for writing tbb…
SteveBronder Jan 10, 2025
f522a83
Merge remote-tracking branch 'refs/remotes/origin/feature/reduce-mem-…
SteveBronder Jan 10, 2025
e7a2452
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Jan 10, 2025
185bfa9
add apply to multi headers
SteveBronder Jan 10, 2025
f2c4c1b
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Jan 10, 2025
e48467b
add apply to multi headers
SteveBronder Jan 10, 2025
be3cf7e
add includes for concurrent writer
SteveBronder Jan 10, 2025
2ab9201
move concurrent writer to its own file
SteveBronder Jan 15, 2025
a977ad4
[Jenkins] auto-formatting by clang-format version 10.0.0-4ubuntu1
stan-buildbot Jan 15, 2025
a4e70ff
add back filling_start_row
SteveBronder Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/stan_math
104 changes: 104 additions & 0 deletions src/stan/callbacks/concurrent_writer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#ifndef STAN_CALLBACKS_CONCURRENT_WRITER_HPP
#define STAN_CALLBACKS_CONCURRENT_WRITER_HPP

#include <stan/math/prim/fun/Eigen.hpp>
#include <stan/math/prim/meta.hpp>
#include <tbb/concurrent_queue.h>
#include <functional>
#include <string>
#include <thread>
#include <vector>

namespace stan::callbacks {
#ifdef STAN_THREADS
/**
* Takes a writer and makes it thread safe via multiple queues.
* At the first write a single busy thread is spawned to write to the writer.
* This class uses an `std::thread` instead of a tbb task graph because
* of deadlocking issues. A deadlock can occur if TBB gives all threads to the
* parallel for loop, and all threads hit an instance of max capacity. TBB can
* choose to wait for a thread to finish instead of spinning up the write
* thread. So to circumvent that issue, we use an std::thread.
* @tparam Writer A type that inherits from `writer`
*/
template <typename Writer>
struct concurrent_writer {
std::reference_wrapper<Writer> writer;
tbb::concurrent_bounded_queue<std::string> str_messages_{};
tbb::concurrent_bounded_queue<std::vector<std::string>> vec_str_messages_{};
tbb::concurrent_bounded_queue<Eigen::RowVectorXd> eigen_messages_{};
bool continue_writing_{true};
std::thread thread_;
/**
* Constructs a concurrent writer from a writer and spins up a thread for
* writing.
* @param writer A writer to write to
*/
explicit concurrent_writer(Writer& writer) : writer(writer) {
str_messages_.set_capacity(1000);
vec_str_messages_.set_capacity(1000);
eigen_messages_.set_capacity(1000);
thread_ = std::thread([&]() {
std::string str;
std::vector<std::string> vec_str;
Eigen::RowVectorXd eigen;
while (continue_writing_
|| !(str_messages_.empty() && vec_str_messages_.empty()
&& eigen_messages_.empty())) {
while (str_messages_.try_pop(str)) {
writer(str);
}
while (vec_str_messages_.try_pop(vec_str)) {
writer(vec_str);
}
while (eigen_messages_.try_pop(eigen)) {
writer(eigen);
}
}
});
}
/**
* Place a value in a queue for writing.
* @tparam T Either an `std::vector<std::string|double>`, an Eigen vector, or
* a string
* @param t A value to put on a queue
*/
template <typename T>
void operator()(T&& t) {
if constexpr (stan::is_std_vector<T>::value) {
if constexpr (std::is_arithmetic_v<stan::value_type_t<T>>) {
eigen_messages_.push(Eigen::RowVectorXd::Map(t.data(), t.size()));
} else {
vec_str_messages_.push(t);
}
} else if constexpr (std::is_same_v<T, std::string>) {
str_messages_.push(t);
} else if constexpr (stan::is_eigen_row_vector<T>::value) {
eigen_messages_.push(t);
} else if constexpr (stan::is_eigen_col_vector<T>::value) {
eigen_messages_.push(t.transpose());
} else {
static_assert(1, "Unsupported type passed to concurrent_writer");
}
}
void operator()() { str_messages_.push(writer.get().comment_prefix()); }
void wait() {
continue_writing_ = false;
thread_.join();
}
};
#else
template <typename Writer>
struct concurrent_writer {
std::reference_wrapper<Writer> writer;
explicit concurrent_writer(Writer& writer) : writer(writer) {}
template <typename T>
void operator()(T&& t) {
writer(std::forward<T>(t));
}
void operator()() { writer(); }
inline static constexpr void wait() {}
};
#endif
} // namespace stan::callbacks
#endif
85 changes: 85 additions & 0 deletions src/stan/callbacks/multi_writer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#ifndef STAN_CALLBACKS_MULTI_WRITER_HPP
#define STAN_CALLBACKS_MULTI_WRITER_HPP

#include <stan/math/prim/fun/Eigen.hpp>
#include <stan/callbacks/writer.hpp>
#include <stan/math/prim/functor/for_each.hpp>
#include <stan/math/prim/functor/apply.hpp>
#include <memory>
#include <ostream>
#include <string>
#include <vector>

namespace stan {
namespace callbacks {

/**
* `multi_writer` is an layer on top of a writer class that
* allows for multiple output streams to be written to.
* @tparam Writers A parameter pack of types that inherit from `writer`
*/
template <typename... Writers>
class multi_writer {
public:
/**
* Constructs a multi stream writer from a parameter pack of writers.
*
* @param[in, out] args A parameter pack of writers
*/
template <typename... Args>
explicit multi_writer(Args&&... args)
: output_(std::forward<Args>(args)...) {}

multi_writer();

/**
* @tparam T Any type accepted by a `writer` overload
* @param[in] x A value to write to the output streams
*/
template <typename T>
void operator()(T&& x) {
stan::math::for_each([&](auto&& output) { output(x); }, output_);
}
void operator()() {
stan::math::for_each([](auto&& output) { output(); }, output_);
}

/**
* Checks if all underlying writers are nonnull.
*/
inline bool is_nonnull() const noexcept {
return stan::math::apply(
[](auto&&... output) { return (output.is_nonnull() && ...); }, output_);
}

/**
* Get the underlying stream
*/
inline auto& get_stream() noexcept { return output_; }
const char* comment_prefix() const noexcept {
return std::get<0>(output_).comment_prefix();
}

private:
/**
* Output stream
*/
std::tuple<std::reference_wrapper<Writers>...> output_;
};

namespace internal {
template <typename T>
struct is_multi_writer : std::false_type {};

template <typename... Types>
struct is_multi_writer<multi_writer<Types...>> : std::true_type {};
} // namespace internal

template <typename T>
inline constexpr bool is_multi_writer_v
= internal::is_multi_writer<std::decay_t<T>>::value;

} // namespace callbacks
} // namespace stan

#endif
11 changes: 11 additions & 0 deletions src/stan/callbacks/stream_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ class stream_writer : public writer {
output_ << comment_prefix_ << message << std::endl;
}

/**
* Checks if stream is valid.
*/
virtual bool is_nonnull() const noexcept { return output_.good(); }
/**
* Return the comment prefix
*/
const char* comment_prefix() const noexcept {
return comment_prefix_.c_str();
}

private:
/**
* Output stream
Expand Down
10 changes: 10 additions & 0 deletions src/stan/callbacks/tee_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ class tee_writer final : public writer {
writer2_(message);
}

/**
* Checks if both streams are valid.
*/
virtual bool is_nonnull() const noexcept {
return writer1_.is_nonnull() && writer2_.is_nonnull();
}
const char* comment_prefix() const noexcept {
return writer1_.comment_prefix();
}

private:
/**
* The first writer
Expand Down
16 changes: 14 additions & 2 deletions src/stan/callbacks/unique_stream_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ class unique_stream_writer final : public writer {
*output_ << comment_prefix_ << message << std::endl;
}

/**
* Checks if stream is valid.
*/
bool is_nonnull() const noexcept { return output_ != nullptr; }

const char* comment_prefix() const noexcept {
return comment_prefix_.c_str();
}

private:
/**
* Comma formatter for writing Eigen matrices
Expand Down Expand Up @@ -141,6 +150,8 @@ class unique_stream_writer final : public writer {
*/
template <class T>
void write_vector(const std::vector<T>& v) {
std::stringstream ss;
ss.copyfmt(*output_);
if (output_ == nullptr)
return;
if (v.empty()) {
Expand All @@ -149,9 +160,10 @@ class unique_stream_writer final : public writer {
auto last = v.end();
--last;
for (auto it = v.begin(); it != last; ++it) {
*output_ << *it << ",";
ss << *it << ",";
}
*output_ << v.back() << std::endl;
ss << v.back() << std::endl;
*output_ << ss.str();
}
};

Expand Down
6 changes: 6 additions & 0 deletions src/stan/callbacks/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class writer {
*/
virtual void operator()(const std::string& message) {}

/**
* Checks if stream is valid.
*/
virtual bool is_nonnull() const noexcept { return false; }
virtual const char* comment_prefix() const noexcept { return "# "; }

/**
* Writes multiple rows and columns of values in csv format.
*
Expand Down
Loading