From c1319c5f096da5aba7e9af4d68f9a23a119de49c Mon Sep 17 00:00:00 2001 From: sabinadayanova Date: Sun, 8 May 2022 13:58:37 +0000 Subject: [PATCH 01/11] trying to add mlpack to ch --- CMakeLists.txt | 2 ++ src/CMakeLists.txt | 6 ++++++ src/Interpreters/_kek.cpp | 21 +++++++++++++++++++++ 3 files changed, 29 insertions(+) create mode 100644 src/Interpreters/_kek.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index dcd313dcb3c2..7eb642c1f789 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -567,3 +567,5 @@ add_subdirectory (tests) add_subdirectory (utils) include (cmake/sanitize_target_link_libraries.cmake) + +link_libraries(mlpack) \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 20db948abd09..29d9e68a0034 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -553,6 +553,12 @@ if (TARGET ch_contrib::rapidjson) dbms_target_link_libraries(PRIVATE ch_contrib::rapidjson) endif() +# if (TARGET mlpack) +# dbms_target_link_libraries(PUBLIC mlpack) +# endif() +dbms_target_link_libraries(PRIVATE mlpack) +target_link_libraries (clickhouse_common_io PRIVATE mlpack) + dbms_target_link_libraries(PUBLIC ch_contrib::consistent_hashing) include ("${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake") diff --git a/src/Interpreters/_kek.cpp b/src/Interpreters/_kek.cpp new file mode 100644 index 000000000000..60efbb0e65ab --- /dev/null +++ b/src/Interpreters/_kek.cpp @@ -0,0 +1,21 @@ +#include +#include +#include + +void popa() { + arma::mat regressors({1.0, 2.0, 3.0}); + arma::rowvec responses({1.0, 4.0, 9.0}); + auto lr = mlpack::regression::LinearRegression(regressors, responses); + arma::mat testX({2.0}); + arma::rowvec testY; + lr.Predict(testX, testY); + // std::cout << testY << std::endl; + + // bool status = mlpack::data::Save("zhopa.bin", "sraka", lr); + + LOG_FATAL(&Poco::Logger::root(), "AOOAOAOOAAOO {}", testY); + // mlpack::regression::LinearRegression lr; + // bool status = mlpack::data::Load("zhopa.bin", "sraka", lr); + // lr.Predict(testX, testY); + // std::cout << status << std::endl; +} From 7ec8cb2408b33ea2f99f584eb703b2a696a8a2b9 Mon Sep 17 00:00:00 2001 From: sabinadayanova Date: Sun, 15 May 2022 14:10:34 +0000 Subject: [PATCH 02/11] add linreg model engine --- CMakeLists.txt | 4 +- cmake/find/FindMLPACK.cmake | 51 ++++++ contrib/boost-cmake/CMakeLists.txt | 55 +++++++ src/Access/Common/AccessType.h | 1 + src/CMakeLists.txt | 26 +++- src/Common/ErrorCodes.cpp | 4 + src/Storages/MLpack/LinRegSettings.cpp | 39 +++++ src/Storages/MLpack/LinRegSettings.h | 24 +++ src/Storages/MLpack/StorageLinReg.cpp | 207 +++++++++++++++++++++++++ src/Storages/MLpack/StorageLinReg.h | 46 ++++++ src/Storages/registerStorages.cpp | 2 + 11 files changed, 451 insertions(+), 8 deletions(-) create mode 100644 cmake/find/FindMLPACK.cmake create mode 100644 src/Storages/MLpack/LinRegSettings.cpp create mode 100644 src/Storages/MLpack/LinRegSettings.h create mode 100644 src/Storages/MLpack/StorageLinReg.cpp create mode 100644 src/Storages/MLpack/StorageLinReg.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 7eb642c1f789..c6dc678fc16b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -447,6 +447,8 @@ elseif (OS_FREEBSD) endif () link_libraries(global-group) +set (WERROR OFF) + if (WERROR) # Don't pollute CMAKE_CXX_FLAGS with -Werror as it will break some CMake checks. # Instead, adopt modern cmake usage requirement. @@ -567,5 +569,3 @@ add_subdirectory (tests) add_subdirectory (utils) include (cmake/sanitize_target_link_libraries.cmake) - -link_libraries(mlpack) \ No newline at end of file diff --git a/cmake/find/FindMLPACK.cmake b/cmake/find/FindMLPACK.cmake new file mode 100644 index 000000000000..c0536b9ba238 --- /dev/null +++ b/cmake/find/FindMLPACK.cmake @@ -0,0 +1,51 @@ +#.rst: +# FindMLPACK +# ------------- +# +# Find MLPACK +# +# Find the MLPACK C++ library +# +# Using MLPACK:: +# +# find_package(MLPACK REQUIRED) +# include_directories(${MLPACK_INCLUDE_DIRS}) +# add_executable(foo foo.cc) +# target_link_libraries(foo ${MLPACK_LIBRARIES}) +# +# This module sets the following variables:: +# +# MLPACK_FOUND - set to true if the library is found +# MLPACK_INCLUDE_DIRS - list of required include directories +# MLPACK_LIBRARIES - list of libraries to be linked +# MLPACK_VERSION_MAJOR - major version number +# MLPACK_VERSION_MINOR - minor version number +# MLPACK_VERSION_PATCH - patch version number +# MLPACK_VERSION_STRING - version number as a string (ex: "1.0.4") + +include(FindPackageHandleStandardArgs) + +# UNIX paths are standard, no need to specify them. +find_library(MLPACK_LIBRARY + NAMES mlpack + PATHS "$ENV{ProgramFiles}/mlpack/lib" "$ENV{ProgramFiles}/mlpack/lib64" "$ENV{ProgramFiles}/mlpack" +) +find_path(MLPACK_INCLUDE_DIR + NAMES mlpack/core.hpp mlpack/prereqs.hpp + PATHS "$ENV{ProgramFiles}/mlpack" +) + +find_package_handle_standard_args(MLPACK + REQUIRED_VARS MLPACK_LIBRARY MLPACK_INCLUDE_DIR +) + +if(MLPACK_FOUND) + set(MLPACK_INCLUDE_DIRS ${MLPACK_INCLUDE_DIR}) + set(MLPACK_LIBRARIES ${MLPACK_LIBRARY}) +endif() + +# Hide internal variables +mark_as_advanced( + MLPACK_INCLUDE_DIR + MLPACK_LIBRARY +) \ No newline at end of file diff --git a/contrib/boost-cmake/CMakeLists.txt b/contrib/boost-cmake/CMakeLists.txt index 3d66bc97971e..20681778a223 100644 --- a/contrib/boost-cmake/CMakeLists.txt +++ b/contrib/boost-cmake/CMakeLists.txt @@ -183,3 +183,58 @@ target_include_directories(_boost_circular_buffer SYSTEM BEFORE INTERFACE ${LIBR add_library(_boost_heap INTERFACE) add_library(boost::heap ALIAS _boost_heap) target_include_directories(_boost_heap SYSTEM BEFORE INTERFACE ${LIBRARY_DIR}) + +set (SRCS_SERIALIZATION + "${LIBRARY_DIR}/libs/serialization/src/archive_exception.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_archive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_iarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_iserializer.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_oarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_oserializer.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_pointer_iserializer.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_pointer_oserializer.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_serializer_map.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_text_iprimitive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_text_oprimitive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_text_wiprimitive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_text_woprimitive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_xml_archive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/basic_xml_grammar.ipp" + "${LIBRARY_DIR}/libs/serialization/src/binary_iarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/binary_oarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/binary_wiarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/binary_woarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/codecvt_null.cpp" + "${LIBRARY_DIR}/libs/serialization/src/extended_type_info_no_rtti.cpp" + "${LIBRARY_DIR}/libs/serialization/src/extended_type_info_typeid.cpp" + "${LIBRARY_DIR}/libs/serialization/src/extended_type_info.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_binary_iarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_iarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_oarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_text_iarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_text_oarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_text_wiarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_text_woarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_xml_iarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_xml_oarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_xml_wiarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/polymorphic_xml_woarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/stl_port.cpp" + "${LIBRARY_DIR}/libs/serialization/src/text_iarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/text_oarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/text_wiarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/text_woarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/utf8_codecvt_facet.cpp" + "${LIBRARY_DIR}/libs/serialization/src/void_cast.cpp" + "${LIBRARY_DIR}/libs/serialization/src/xml_archive_exception.cpp" + "${LIBRARY_DIR}/libs/serialization/src/xml_grammar.cpp" + "${LIBRARY_DIR}/libs/serialization/src/xml_iarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/xml_oarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/xml_wgrammar.cpp" + "${LIBRARY_DIR}/libs/serialization/src/xml_wiarchive.cpp" + "${LIBRARY_DIR}/libs/serialization/src/xml_woarchive.cpp" +) + +add_library (_boost_serialization ${SRCS_SERIALIZATION}) +add_library (boost::serialization ALIAS _boost_serialization) +target_include_directories (_boost_serialization PRIVATE ${LIBRARY_DIR}) \ No newline at end of file diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index d3500652f95a..d27243962f5e 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -183,6 +183,7 @@ enum class AccessType M(ODBC, "", GLOBAL, SOURCES) \ M(JDBC, "", GLOBAL, SOURCES) \ M(HDFS, "", GLOBAL, SOURCES) \ + M(MLPACK, "", GLOBAL, SOURCES) \ M(S3, "", GLOBAL, SOURCES) \ M(HIVE, "", GLOBAL, SOURCES) \ M(SOURCES, "", GROUP, ALL) \ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 29d9e68a0034..97434ab2b603 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -99,6 +99,7 @@ add_headers_and_sources(clickhouse_common_io IO/S3) list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp) add_headers_and_sources(dbms Disks/IO) +add_headers_and_sources(dbms Storages/MLpack) if (TARGET ch_contrib::sqlite) add_headers_and_sources(dbms Databases/SQLite) endif() @@ -317,6 +318,25 @@ macro (dbms_target_link_libraries) endforeach () endmacro () +include (../cmake/find/FindMLPACK.cmake) +find_package(MLPACK REQUIRED) +# include_directories(${MLPACK_INCLUDE_DIRS}) +# target_link_libraries(clickhouse_interpreters PUBLIC ${MLPACK_LIBRARIES}) + +find_package(Armadillo REQUIRED) + +dbms_target_link_libraries(PUBLIC ${MLPACK_LIBRARIES} ${ARMADILLO_LIBRARIES}) +target_link_libraries (clickhouse_common_io PUBLIC ${MLPACK_LIBRARIES} ${ARMADILLO_LIBRARIES}) +dbms_target_include_directories(PUBLIC ${MLPACK_INCLUDE_DIRS} ${ARMADILLO_INCLUDE_DIRS}) + +target_link_libraries(${MLPACK_LIBRARIES} INTERFACE ${ARMADILLO_LIBRARIES}) + +target_link_libraries(clickhouse_common_io PUBLIC _boost_serialization) +# dbms_target_include_directories(PUBLIC _boost_serialization) +# if (TARGET ${MLPACK_LIBRARIES}) +# add_headers_and_sources(dbms Storages/MLpack) +# endif() + dbms_target_include_directories (PUBLIC "${ClickHouse_SOURCE_DIR}/src" "${ClickHouse_BINARY_DIR}/src") target_include_directories (clickhouse_common_io PUBLIC "${ClickHouse_SOURCE_DIR}/src" "${ClickHouse_BINARY_DIR}/src") @@ -553,12 +573,6 @@ if (TARGET ch_contrib::rapidjson) dbms_target_link_libraries(PRIVATE ch_contrib::rapidjson) endif() -# if (TARGET mlpack) -# dbms_target_link_libraries(PUBLIC mlpack) -# endif() -dbms_target_link_libraries(PRIVATE mlpack) -target_link_libraries (clickhouse_common_io PRIVATE mlpack) - dbms_target_link_libraries(PUBLIC ch_contrib::consistent_hashing) include ("${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake") diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index eb84e24b7136..767e5ef435f5 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -624,6 +624,10 @@ M(653, CANNOT_PARSE_BACKUP_SETTINGS) \ M(654, WRONG_BACKUP_SETTINGS) \ M(655, FAILED_TO_RESTORE_METADATA_ON_OTHER_NODE) \ + M(656, CANNOT_SAVE_MLPACK_MODEL) \ + M(657, INSUFFICIENT_NUMBER_OF_COLUMNS) \ + M(658, CANNOT_CONVERT_TO_FLOAT64) \ + M(659, FEATURE_BUFFERS_EMPTY)\ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Storages/MLpack/LinRegSettings.cpp b/src/Storages/MLpack/LinRegSettings.cpp new file mode 100644 index 000000000000..ce7e18172427 --- /dev/null +++ b/src/Storages/MLpack/LinRegSettings.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_SETTING; +} + +IMPLEMENT_SETTINGS_TRAITS(MLpackSettingsTraits, LIST_OF_MLPACK_SETTINGS) + +void LinRegSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + e.addMessage("for storage " + storage_def.engine->name); + throw; + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } +} +} diff --git a/src/Storages/MLpack/LinRegSettings.h b/src/Storages/MLpack/LinRegSettings.h new file mode 100644 index 000000000000..39974e57019b --- /dev/null +++ b/src/Storages/MLpack/LinRegSettings.h @@ -0,0 +1,24 @@ +#pragma once + +#include +#include + +namespace DB +{ + class ASTStorage; + + +#define MLPACK_RELATED_SETTINGS(M) \ + M(Float, lambda, 0.0, "Regularization coefficient.", 0) \ + +#define LIST_OF_MLPACK_SETTINGS(M) \ + MLPACK_RELATED_SETTINGS(M) + // FORMAT_FACTORY_SETTINGS(M) + +DECLARE_SETTINGS_TRAITS(MLpackSettingsTraits, LIST_OF_MLPACK_SETTINGS) + +struct LinRegSettings : public BaseSettings +{ + void loadFromQuery(ASTStorage & storage_def); +}; +} diff --git a/src/Storages/MLpack/StorageLinReg.cpp b/src/Storages/MLpack/StorageLinReg.cpp new file mode 100644 index 000000000000..2cb6fee8f89a --- /dev/null +++ b/src/Storages/MLpack/StorageLinReg.cpp @@ -0,0 +1,207 @@ +#include + +// #include +#include +#include +#include +#include +// #include +// #include +#include +// #include +#include +// #include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int CANNOT_SAVE_MLPACK_MODEL; + extern const int INSUFFICIENT_NUMBER_OF_COLUMNS; + extern const int CANNOT_CONVERT_TO_FLOAT64; + extern const int FEATURE_BUFFERS_EMPTY; +} + + +class StorageLinRegSink : public SinkToStorage +{ +public: + StorageLinRegSink( + const Block & sample_block_, + String filepath_, + std::shared_ptr model_) + : SinkToStorage(sample_block_) + , sample_block(sample_block_) + , filepath(filepath_) + , model(model_) + { + feature_buffers.resize(sample_block.columns()); + } + + String getName() const override { return "StorageLinRegSink"; } + + void consume(Chunk chunk) override + { + if (!chunk.hasRows()) + return; + + if (chunk.getNumColumns() < 2) + throw Exception( + "there must be at least 2 columns.", + ErrorCodes::INSUFFICIENT_NUMBER_OF_COLUMNS); + + auto& columns = chunk.getColumns(); + + for (size_t i = 0; i < chunk.getNumColumns(); ++i) + { + auto ptr = typeid_cast(columns[i].get()); + if (ptr == nullptr) + throw Exception( + "columns must be float64.", + ErrorCodes::CANNOT_CONVERT_TO_FLOAT64); + + for (size_t j = 0; j < chunk.getNumRows(); ++j) + { + feature_buffers[i].push_back(ptr->getFloat64(j)); + } + } + + } + + void fillFeatureBuffer(const std::vector> & columns, size_t n_rows, size_t n_features, Float64* buffer) const + { + size_t offset = 0; + for (size_t i = 0; i < n_features; ++i) + { + for (size_t j = 0; j < n_rows; ++j) + { + buffer[offset + j] = columns[i][j]; + } + offset += n_rows; + } + } + + void onFinish() override + { + if (feature_buffers.empty()) + throw Exception( + "your podarray vector is empty.", + ErrorCodes::FEATURE_BUFFERS_EMPTY); + + if (feature_buffers.front().empty()) + return; // no data to train on + size_t n_features = feature_buffers.size() - 1; + size_t n_rows = feature_buffers.front().size(); + PODArray feature_vector(n_features * n_rows); + auto * buffer = feature_vector.data(); + + fillFeatureBuffer(feature_buffers, n_rows, n_features, buffer); + + arma::mat regressors(buffer, n_rows, n_features); + regressors = regressors.t(); + + arma::rowvec target; + target = arma::rowvec(feature_buffers.back().data(), n_rows); + + this->model->Train(regressors, target); + + bool save_status = mlpack::data::Save(this->filepath, "sraka", *this->model); + if (!save_status) + throw Exception( + "cannot save model.", + ErrorCodes::CANNOT_SAVE_MLPACK_MODEL); + + } + +private: + Block sample_block; + std::vector> feature_buffers; + // std::vector> memory_buffers; + String filepath; + std::shared_ptr model; +}; + +StorageLinReg::StorageLinReg( + const StorageID & table_id_, + std::unique_ptr linreg_settings_, + const String filepath_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment) + : IStorage(table_id_) + , filepath(filepath_) + , linreg_settings(std::move(linreg_settings_)) +{ + StorageInMemoryMetadata storage_metadata; // отсавляем как есть + storage_metadata.setColumns(columns_); + storage_metadata.setConstraints(constraints_); + storage_metadata.setComment(comment); + setInMemoryMetadata(storage_metadata); + + // if (linreg_settings== nullptr) + // LOG_FATAL(&Poco::Logger::root(), "\n\n\n\n\n ZHOPA SETTINGS {}\n\n\n\n\n", "status"); + + + model = std::make_shared(); + // if (model== nullptr) + // LOG_FATAL(&Poco::Logger::root(), "\n\n\n\n\n ZHOPA MODEL {}\n\n\n\n\n", "status"); + + model->Lambda() = linreg_settings->lambda.value; +} + +SinkToStoragePtr StorageLinReg::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr) +{ + auto sample_block = metadata_snapshot->getSampleBlock(); + + return std::make_shared( + sample_block, + this->filepath, + this->model); +} + + +LinRegConfiguration StorageLinReg::getConfiguration(ASTs engine_args, ContextPtr context) +{ + LinRegConfiguration configuration; + + if (engine_args.size() != 1) + throw Exception( + "wrong number of parameters.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); + + configuration.filepath = engine_args[0]->as().value.safeGet(); + return configuration; +} + + +void registerStorageLinReg(StorageFactory & factory) +{ + factory.registerStorage("LinReg", [](const StorageFactory::Arguments & args) + { + auto linreg_settings = std::make_unique(); + linreg_settings->loadFromQuery(*args.storage_def); + auto configuration = StorageLinReg::getConfiguration(args.engine_args, args.getLocalContext()); + + return std::make_shared( + args.table_id, + std::move(linreg_settings), + configuration.filepath, + args.columns, + args.constraints, + args.comment); + }, + { + .supports_settings = true, + .source_access_type = AccessType::MLPACK, + }); + +} + +} diff --git a/src/Storages/MLpack/StorageLinReg.h b/src/Storages/MLpack/StorageLinReg.h new file mode 100644 index 000000000000..ec385215c965 --- /dev/null +++ b/src/Storages/MLpack/StorageLinReg.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include +#include +#include +// #include "LinRegSettings.h" + +namespace DB +{ + +struct LinRegConfiguration +{ + String filepath; +}; + +/* Implements storage in the MongoDB database. + * Use ENGINE = mysql(host_port, database_name, table_name, user_name, password) + * Read only. + */ + +class StorageLinReg final : public IStorage +{ +public: + StorageLinReg( + const StorageID & table_id_, + std::unique_ptr linreg_settings_, + const String filepath, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment); + + std::string getName() const override { return "LinReg"; } + + SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; + + static LinRegConfiguration getConfiguration(ASTs engine_args, ContextPtr context); + +private: + const String filepath; + std::unique_ptr linreg_settings; + std::shared_ptr model; +}; + +} diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index f567bf6eefca..1048f0a86f0c 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -27,6 +27,7 @@ void registerStorageLiveView(StorageFactory & factory); void registerStorageGenerateRandom(StorageFactory & factory); void registerStorageExecutable(StorageFactory & factory); void registerStorageWindowView(StorageFactory & factory); +void registerStorageLinReg(StorageFactory & factory); #if USE_AWS_S3 void registerStorageS3(StorageFactory & factory); @@ -104,6 +105,7 @@ void registerStorages() registerStorageGenerateRandom(factory); registerStorageExecutable(factory); registerStorageWindowView(factory); + registerStorageLinReg(factory); #if USE_AWS_S3 registerStorageS3(factory); From 82ca98750f5623fb38f3ae9533b4237130119de5 Mon Sep 17 00:00:00 2001 From: sabinadayanova Date: Mon, 16 May 2022 00:16:04 +0000 Subject: [PATCH 03/11] add abstract class --- src/Storages/MLpack/IModel.h | 183 +++++++++++++++++++ src/Storages/MLpack/ModelSettings.cpp | 39 ++++ src/Storages/MLpack/ModelSettings.h | 25 +++ src/Storages/MLpack/StorageMLmodel.cpp | 242 +++++++++++++++++++++++++ src/Storages/MLpack/StorageMLmodel.h | 49 +++++ src/Storages/registerStorages.cpp | 6 + 6 files changed, 544 insertions(+) create mode 100644 src/Storages/MLpack/IModel.h create mode 100644 src/Storages/MLpack/ModelSettings.cpp create mode 100644 src/Storages/MLpack/ModelSettings.h create mode 100644 src/Storages/MLpack/StorageMLmodel.cpp create mode 100644 src/Storages/MLpack/StorageMLmodel.h diff --git a/src/Storages/MLpack/IModel.h b/src/Storages/MLpack/IModel.h new file mode 100644 index 000000000000..6bc41654e323 --- /dev/null +++ b/src/Storages/MLpack/IModel.h @@ -0,0 +1,183 @@ +#pragma once + +// #include +// #include +// #include +// #include +// #include +// #include +// #include +#include +#include +#include +#include +#include +#include +#include +#include + +// namespace ErrorCodes +// { +// extern const int CANNOT_SAVE_MLPACK_MODEL; +// } + +namespace DB +{ + +struct TrainResult {} + +struct LinearRegressionResult : public TrainResult +{ + mlpack::regression::LinearRegression model; +}; + +struct LogisticRegressionResult : public TrainResult +{ + mlpack::regression::LogisticRegression model; +}; + +struct LinearSVMResult : public TrainResult +{ + mlpack::svm::LinearSVM model; +}; + + +class IModel +{ +public: + IModel() = default; + virtual ~IModel() = default; + + virtual String GetName() const = 0; + + virtual const TrainResult& GetModel() const = 0; + + virtual void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) = 0; + + virtual bool Save(String filepath) = 0; + +}; + +class LinReg: public IModel +{ +public: + LinReg( + std::unique_ptr model_settings) + : IModel() + { + model.Lambda() = model_settings->lambda.value; + } + + String GetName() const { return "LinearRegression"; } + + const TrainResult& GetModel() const { return LinearRegressionResult{model}; } + + void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override + { + arma::rowvec target = arma::rowvec(target_ptr, n_rows); + model.Train(regressors, target); + } + + bool Save(String filepath) override + { + return mlpack::data::Save(filepath, "linreg", model); + } + +protected: + mlpack::regression::LinearRegression model; +}; + +class LogReg: public IModel +{ +public: + LogReg( + std::unique_ptr model_settings) + : IModel() + { + model.Lambda() = model_settings->lambda.value; + // auto opt_type = model_settings->optimizer.value; + } + + String GetName() const { return "LogisticRegression"; } + + const TrainResult& GetModel() const { return LogisticRegressionResult{model}; } + + // void Train(const arma::mat& regressors, const arma::Row& target) override { + // ens::L_BFGS lbfgsOpt; + + // model.Train(regressors, target, lbfgsOpt); + // } + + void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override { + PODArray modified_target(n_rows); + for (size_t i = 0; i < n_rows; ++i) + { + modified_target[i] = static_cast(target_ptr[i]); + } + // size_t * modified_ptr = reinterpret_cast(target_ptr); + arma::Row target = arma::Row(modified_target.data(), n_rows); + model.Train(regressors, target); + } + + bool Save(String filepath) override + { + return mlpack::data::Save(filepath, "logreg", model); + } + +protected: + mlpack::regression::LogisticRegression<> model; + +}; + +class LinearSVM: public IModel +{ +public: + LinearSVM( + std::unique_ptr model_settings) + : IModel() + { + model.Lambda() = model_settings->lambda.value; + model.Delta() = model_settings->delta.value; + } + + String GetName() const { return "LinearSVM"; } + + const TrainResult& GetModel() const { return LinearSVMResult{model}; } + + // void Train(const arma::mat& regressors, const arma::Row& target) override { + // ens::L_BFGS lbfgsOpt; + // model.Train(regressors, target, 2, lbfgsOpt); + // } + + void TrainArb(const arma::mat& regressors, Float64 * /*target_ptr*/, size_t n_rows) override + { + // PODArray modified_target; + // modified_target.resize(n_rows); + // if (target_ptr == nullptr) + // LOG_FATAL(&Poco::Logger::root(), "\n\n\n\n\n ZHOPA target ptr {}\n\n\n\n\n", "status"); + + + // for (size_t i = 0; i < n_rows; ++i) + // { + // modified_target[i] = static_cast(target_ptr[i]); + // } + // size_t * modified_ptr = reinterpret_cast(target_ptr); + PODArray pipa(n_rows, 1); + arma::Row target = arma::Row(pipa.data(), n_rows); + // arma::Row target = arma::Row(modified_target.data(), n_rows); + ens::L_BFGS lbfgsOpt; + model.Train(regressors, target, 2, lbfgsOpt); + } + + bool Save(String filepath) override + { + return mlpack::data::Save(filepath, "linearsvm", model); + } + +protected: + mlpack::svm::LinearSVM<> model; +}; + +using IModelPtr = std::shared_ptr; + +} diff --git a/src/Storages/MLpack/ModelSettings.cpp b/src/Storages/MLpack/ModelSettings.cpp new file mode 100644 index 000000000000..1a3a6f778c0c --- /dev/null +++ b/src/Storages/MLpack/ModelSettings.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_SETTING; +} + +IMPLEMENT_SETTINGS_TRAITS(ModelSettingsTraits, LIST_OF_MODEL_SETTINGS) + +void ModelSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + e.addMessage("for storage " + storage_def.engine->name); + throw; + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } +} +} diff --git a/src/Storages/MLpack/ModelSettings.h b/src/Storages/MLpack/ModelSettings.h new file mode 100644 index 000000000000..ec69f2d6c0a7 --- /dev/null +++ b/src/Storages/MLpack/ModelSettings.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include + +namespace DB +{ + class ASTStorage; + + +#define MODEL_RELATED_SETTINGS(M) \ + M(Float, lambda, 0.0, "Regularization coefficient.", 0) \ + M(String, optimizer, "lbfgs", "Optimizer.", 0) \ + M(Float, delta, 1.0, "Margin of difference between correct class and other classes.", 0) \ + +#define LIST_OF_MODEL_SETTINGS(M) \ + MODEL_RELATED_SETTINGS(M) + +DECLARE_SETTINGS_TRAITS(ModelSettingsTraits, LIST_OF_MODEL_SETTINGS) + +struct ModelSettings : public BaseSettings +{ + void loadFromQuery(ASTStorage & storage_def); +}; +} diff --git a/src/Storages/MLpack/StorageMLmodel.cpp b/src/Storages/MLpack/StorageMLmodel.cpp new file mode 100644 index 000000000000..fa210fbee9b6 --- /dev/null +++ b/src/Storages/MLpack/StorageMLmodel.cpp @@ -0,0 +1,242 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int CANNOT_SAVE_MLPACK_MODEL; + extern const int INSUFFICIENT_NUMBER_OF_COLUMNS; + extern const int CANNOT_CONVERT_TO_FLOAT64; + extern const int FEATURE_BUFFERS_EMPTY; +} + +class StorageMLmodelSink : public SinkToStorage +{ +public: + StorageMLmodelSink( + const Block & sample_block_, + String filepath_, + IModelPtr model_) + : SinkToStorage(sample_block_) + , sample_block(sample_block_) + , filepath(filepath_) + , model(model_) + { + feature_buffers.resize(sample_block.columns()); + } + + String getName() const override { return "StorageMLmodelSink"; } + + void consume(Chunk chunk) override + { + if (!chunk.hasRows()) + return; + + if (chunk.getNumColumns() < 2) + throw Exception( + "there must be at least 2 columns.", + ErrorCodes::INSUFFICIENT_NUMBER_OF_COLUMNS); + + auto& columns = chunk.getColumns(); + + for (size_t i = 0; i < chunk.getNumColumns(); ++i) + { + auto ptr = typeid_cast(columns[i].get()); + if (ptr == nullptr) + throw Exception( + "columns must be float64.", + ErrorCodes::CANNOT_CONVERT_TO_FLOAT64); + + for (size_t j = 0; j < chunk.getNumRows(); ++j) + { + feature_buffers[i].push_back(ptr->getFloat64(j)); + } + } + + } + + void fillFeatureBuffer(const std::vector> & columns, size_t n_rows, size_t n_features, Float64* buffer) const + { + size_t offset = 0; + for (size_t i = 0; i < n_features; ++i) + { + for (size_t j = 0; j < n_rows; ++j) + { + buffer[offset + j] = columns[i][j]; + } + offset += n_rows; + } + } + + void onFinish() override + { + if (feature_buffers.empty()) + throw Exception( + "your podarray vector is empty.", + ErrorCodes::FEATURE_BUFFERS_EMPTY); + + if (feature_buffers.front().empty()) + return; // no data to train on + size_t n_features = feature_buffers.size() - 1; + size_t n_rows = feature_buffers.front().size(); + PODArray feature_vector(n_features * n_rows); + auto * buffer = feature_vector.data(); + + fillFeatureBuffer(feature_buffers, n_rows, n_features, buffer); + + arma::mat regressors(buffer, n_rows, n_features); + regressors = regressors.t(); + + if (model == nullptr) + LOG_FATAL(&Poco::Logger::root(), "\n\n\n\n\n ZHOPA MODEL {}\n\n\n\n\n", "status"); + + model->TrainArb(regressors, feature_buffers.back().data(), n_rows); + // shady + + bool save_status = model->Save(this->filepath); + if (!save_status) + throw Exception( + "cannot save model.", + ErrorCodes::CANNOT_SAVE_MLPACK_MODEL); + } + +private: + Block sample_block; + std::vector> feature_buffers; + // std::vector> memory_buffers; + String filepath; + IModelPtr model; +}; + +StorageMLmodel::StorageMLmodel( + const StorageID & table_id_, + IModelPtr model_, + const String filepath_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment) + : IStorage(table_id_) + , filepath(filepath_) + , model(model_) +{ + StorageInMemoryMetadata storage_metadata; // отсавляем как есть + storage_metadata.setColumns(columns_); + storage_metadata.setConstraints(constraints_); + storage_metadata.setComment(comment); + setInMemoryMetadata(storage_metadata); +} + +SinkToStoragePtr StorageMLmodel::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr) +{ + auto sample_block = metadata_snapshot->getSampleBlock(); + + return std::make_shared( + sample_block, + this->filepath, + this->model); +} + + +MLmodelConfiguration StorageMLmodel::getConfiguration(ASTs engine_args, ContextPtr context) +{ + MLmodelConfiguration configuration; + + if (engine_args.size() != 1) + throw Exception( + "wrong number of parameters.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); + + configuration.filepath = engine_args[0]->as().value.safeGet(); + return configuration; +} + + +// make for each storage + +void registerStorageLinRegg(StorageFactory & factory) +{ + factory.registerStorage("LinRegg", [](const StorageFactory::Arguments & args) + { + auto linreg_settings = std::make_unique(); + linreg_settings->loadFromQuery(*args.storage_def); + auto configuration = StorageMLmodel::getConfiguration(args.engine_args, args.getLocalContext()); + auto model = std::make_shared(std::move(linreg_settings)); + + return std::make_shared( + args.table_id, + model, + configuration.filepath, + args.columns, + args.constraints, + args.comment); + }, + { + .supports_settings = true, + .source_access_type = AccessType::MLPACK, + }); + +} + +void registerStorageLogReg(StorageFactory & factory) +{ + factory.registerStorage("LogReg", [](const StorageFactory::Arguments & args) + { + auto linreg_settings = std::make_unique(); + linreg_settings->loadFromQuery(*args.storage_def); + auto configuration = StorageMLmodel::getConfiguration(args.engine_args, args.getLocalContext()); + auto model = std::make_shared(std::move(linreg_settings)); + + return std::make_shared( + args.table_id, + model, + configuration.filepath, + args.columns, + args.constraints, + args.comment); + }, + { + .supports_settings = true, + .source_access_type = AccessType::MLPACK, + }); + +} + +void registerStorageLinearSVM(StorageFactory & factory) +{ + factory.registerStorage("LinearSVM", [](const StorageFactory::Arguments & args) + { + auto linreg_settings = std::make_unique(); + linreg_settings->loadFromQuery(*args.storage_def); + auto configuration = StorageMLmodel::getConfiguration(args.engine_args, args.getLocalContext()); + auto model = std::make_shared(std::move(linreg_settings)); + + return std::make_shared( + args.table_id, + model, + configuration.filepath, + args.columns, + args.constraints, + args.comment); + }, + { + .supports_settings = true, + .source_access_type = AccessType::MLPACK, + }); + +} + +} diff --git a/src/Storages/MLpack/StorageMLmodel.h b/src/Storages/MLpack/StorageMLmodel.h new file mode 100644 index 000000000000..58c9a02736e3 --- /dev/null +++ b/src/Storages/MLpack/StorageMLmodel.h @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +// #include "LinRegSettings.h" + +namespace DB +{ + +struct MLmodelConfiguration +{ + String filepath; +}; + +/* Implements storage in the MongoDB database. + * Use ENGINE = mysql(host_port, database_name, table_name, user_name, password) + * Read only. + */ + +class StorageMLmodel final : public IStorage +{ +public: + StorageMLmodel( + const StorageID & table_id_, + IModelPtr model_, + const String filepath, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment); + + std::string getName() const override { return "MLmodel"; } + + // maybe check on null? + const TrainResult& getModel() const { model->GetModel(); } + + SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; + + static MLmodelConfiguration getConfiguration(ASTs engine_args, ContextPtr context); + +private: + const String filepath; + IModelPtr model; +}; + +} diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index 1048f0a86f0c..c59e7a664603 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -28,6 +28,9 @@ void registerStorageGenerateRandom(StorageFactory & factory); void registerStorageExecutable(StorageFactory & factory); void registerStorageWindowView(StorageFactory & factory); void registerStorageLinReg(StorageFactory & factory); +void registerStorageLinRegg(StorageFactory & factory); +void registerStorageLogReg(StorageFactory & factory); +void registerStorageLinearSVM(StorageFactory & factory); #if USE_AWS_S3 void registerStorageS3(StorageFactory & factory); @@ -106,6 +109,9 @@ void registerStorages() registerStorageExecutable(factory); registerStorageWindowView(factory); registerStorageLinReg(factory); + registerStorageLinRegg(factory); + registerStorageLogReg(factory); + registerStorageLinearSVM(factory); #if USE_AWS_S3 registerStorageS3(factory); From dbd875248e698732190387412be208547a8b2bbe Mon Sep 17 00:00:00 2001 From: sabinadayanova Date: Mon, 16 May 2022 00:25:28 +0000 Subject: [PATCH 04/11] fix --- src/Storages/MLpack/StorageMLmodel.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Storages/MLpack/StorageMLmodel.h b/src/Storages/MLpack/StorageMLmodel.h index 58c9a02736e3..937b4688b745 100644 --- a/src/Storages/MLpack/StorageMLmodel.h +++ b/src/Storages/MLpack/StorageMLmodel.h @@ -37,6 +37,9 @@ class StorageMLmodel final : public IStorage // maybe check on null? const TrainResult& getModel() const { model->GetModel(); } + // check on null? + String getModelName() const { model->GetName(); } + SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; static MLmodelConfiguration getConfiguration(ASTs engine_args, ContextPtr context); From 7c1f1b11578ada01424716ceb713723f64139c3e Mon Sep 17 00:00:00 2001 From: sabinadayanova Date: Mon, 16 May 2022 11:37:44 +0000 Subject: [PATCH 05/11] change train result structure --- src/Storages/MLpack/IModel.h | 22 +++++++++++----------- src/Storages/MLpack/StorageMLmodel.h | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Storages/MLpack/IModel.h b/src/Storages/MLpack/IModel.h index 6bc41654e323..f29266120b54 100644 --- a/src/Storages/MLpack/IModel.h +++ b/src/Storages/MLpack/IModel.h @@ -24,21 +24,21 @@ namespace DB { -struct TrainResult {} +struct TrainResult {}; struct LinearRegressionResult : public TrainResult { - mlpack::regression::LinearRegression model; + const mlpack::regression::LinearRegression& model; }; struct LogisticRegressionResult : public TrainResult { - mlpack::regression::LogisticRegression model; + const mlpack::regression::LogisticRegression<>& model; }; struct LinearSVMResult : public TrainResult { - mlpack::svm::LinearSVM model; + const mlpack::svm::LinearSVM<>& model; }; @@ -50,7 +50,7 @@ class IModel virtual String GetName() const = 0; - virtual const TrainResult& GetModel() const = 0; + virtual TrainResult GetModel() const = 0; virtual void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) = 0; @@ -68,9 +68,9 @@ class LinReg: public IModel model.Lambda() = model_settings->lambda.value; } - String GetName() const { return "LinearRegression"; } + String GetName() const override { return "LinearRegression"; } - const TrainResult& GetModel() const { return LinearRegressionResult{model}; } + TrainResult GetModel() const override { return LinearRegressionResult{{}, model}; } void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override { @@ -98,9 +98,9 @@ class LogReg: public IModel // auto opt_type = model_settings->optimizer.value; } - String GetName() const { return "LogisticRegression"; } + String GetName() const override { return "LogisticRegression"; } - const TrainResult& GetModel() const { return LogisticRegressionResult{model}; } + TrainResult GetModel() const override { return LogisticRegressionResult{{}, model}; } // void Train(const arma::mat& regressors, const arma::Row& target) override { // ens::L_BFGS lbfgsOpt; @@ -140,9 +140,9 @@ class LinearSVM: public IModel model.Delta() = model_settings->delta.value; } - String GetName() const { return "LinearSVM"; } + String GetName() const override { return "LinearSVM"; } - const TrainResult& GetModel() const { return LinearSVMResult{model}; } + TrainResult GetModel() const override { return LinearSVMResult{{}, model}; } // void Train(const arma::mat& regressors, const arma::Row& target) override { // ens::L_BFGS lbfgsOpt; diff --git a/src/Storages/MLpack/StorageMLmodel.h b/src/Storages/MLpack/StorageMLmodel.h index 937b4688b745..f648c90b60c9 100644 --- a/src/Storages/MLpack/StorageMLmodel.h +++ b/src/Storages/MLpack/StorageMLmodel.h @@ -35,10 +35,10 @@ class StorageMLmodel final : public IStorage std::string getName() const override { return "MLmodel"; } // maybe check on null? - const TrainResult& getModel() const { model->GetModel(); } + TrainResult getModel() const { return model->GetModel(); } // check on null? - String getModelName() const { model->GetName(); } + String getModelName() const { return model->GetName(); } SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; From c74effc3fa9d768050e34b932422ac125615bff5 Mon Sep 17 00:00:00 2001 From: sabinadayanova Date: Mon, 16 May 2022 21:09:57 +0000 Subject: [PATCH 06/11] remove previous storage architecture --- src/Storages/MLpack/IModel.h | 30 +--- src/Storages/MLpack/LinRegSettings.cpp | 39 ----- src/Storages/MLpack/LinRegSettings.h | 24 --- src/Storages/MLpack/StorageLinReg.cpp | 207 ------------------------- src/Storages/MLpack/StorageLinReg.h | 46 ------ src/Storages/MLpack/StorageMLmodel.cpp | 9 +- src/Storages/MLpack/StorageMLmodel.h | 1 - src/Storages/registerStorages.cpp | 2 - 8 files changed, 7 insertions(+), 351 deletions(-) delete mode 100644 src/Storages/MLpack/LinRegSettings.cpp delete mode 100644 src/Storages/MLpack/LinRegSettings.h delete mode 100644 src/Storages/MLpack/StorageLinReg.cpp delete mode 100644 src/Storages/MLpack/StorageLinReg.h diff --git a/src/Storages/MLpack/IModel.h b/src/Storages/MLpack/IModel.h index f29266120b54..90304e757420 100644 --- a/src/Storages/MLpack/IModel.h +++ b/src/Storages/MLpack/IModel.h @@ -1,12 +1,5 @@ #pragma once -// #include -// #include -// #include -// #include -// #include -// #include -// #include #include #include #include @@ -16,10 +9,6 @@ #include #include -// namespace ErrorCodes -// { -// extern const int CANNOT_SAVE_MLPACK_MODEL; -// } namespace DB { @@ -52,7 +41,7 @@ class IModel virtual TrainResult GetModel() const = 0; - virtual void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) = 0; + virtual void Train(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) = 0; virtual bool Save(String filepath) = 0; @@ -72,7 +61,7 @@ class LinReg: public IModel TrainResult GetModel() const override { return LinearRegressionResult{{}, model}; } - void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override + void Train(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override { arma::rowvec target = arma::rowvec(target_ptr, n_rows); model.Train(regressors, target); @@ -102,13 +91,7 @@ class LogReg: public IModel TrainResult GetModel() const override { return LogisticRegressionResult{{}, model}; } - // void Train(const arma::mat& regressors, const arma::Row& target) override { - // ens::L_BFGS lbfgsOpt; - - // model.Train(regressors, target, lbfgsOpt); - // } - - void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override { + void Train(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override { PODArray modified_target(n_rows); for (size_t i = 0; i < n_rows; ++i) { @@ -144,12 +127,7 @@ class LinearSVM: public IModel TrainResult GetModel() const override { return LinearSVMResult{{}, model}; } - // void Train(const arma::mat& regressors, const arma::Row& target) override { - // ens::L_BFGS lbfgsOpt; - // model.Train(regressors, target, 2, lbfgsOpt); - // } - - void TrainArb(const arma::mat& regressors, Float64 * /*target_ptr*/, size_t n_rows) override + void Train(const arma::mat& regressors, Float64 * /*target_ptr*/, size_t n_rows) override { // PODArray modified_target; // modified_target.resize(n_rows); diff --git a/src/Storages/MLpack/LinRegSettings.cpp b/src/Storages/MLpack/LinRegSettings.cpp deleted file mode 100644 index ce7e18172427..000000000000 --- a/src/Storages/MLpack/LinRegSettings.cpp +++ /dev/null @@ -1,39 +0,0 @@ -#include -#include -#include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int UNKNOWN_SETTING; -} - -IMPLEMENT_SETTINGS_TRAITS(MLpackSettingsTraits, LIST_OF_MLPACK_SETTINGS) - -void LinRegSettings::loadFromQuery(ASTStorage & storage_def) -{ - if (storage_def.settings) - { - try - { - applyChanges(storage_def.settings->changes); - } - catch (Exception & e) - { - if (e.code() == ErrorCodes::UNKNOWN_SETTING) - e.addMessage("for storage " + storage_def.engine->name); - throw; - } - } - else - { - auto settings_ast = std::make_shared(); - settings_ast->is_standalone = false; - storage_def.set(storage_def.settings, settings_ast); - } -} -} diff --git a/src/Storages/MLpack/LinRegSettings.h b/src/Storages/MLpack/LinRegSettings.h deleted file mode 100644 index 39974e57019b..000000000000 --- a/src/Storages/MLpack/LinRegSettings.h +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include -#include - -namespace DB -{ - class ASTStorage; - - -#define MLPACK_RELATED_SETTINGS(M) \ - M(Float, lambda, 0.0, "Regularization coefficient.", 0) \ - -#define LIST_OF_MLPACK_SETTINGS(M) \ - MLPACK_RELATED_SETTINGS(M) - // FORMAT_FACTORY_SETTINGS(M) - -DECLARE_SETTINGS_TRAITS(MLpackSettingsTraits, LIST_OF_MLPACK_SETTINGS) - -struct LinRegSettings : public BaseSettings -{ - void loadFromQuery(ASTStorage & storage_def); -}; -} diff --git a/src/Storages/MLpack/StorageLinReg.cpp b/src/Storages/MLpack/StorageLinReg.cpp deleted file mode 100644 index 2cb6fee8f89a..000000000000 --- a/src/Storages/MLpack/StorageLinReg.cpp +++ /dev/null @@ -1,207 +0,0 @@ -#include - -// #include -#include -#include -#include -#include -// #include -// #include -#include -// #include -#include -// #include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; - extern const int CANNOT_SAVE_MLPACK_MODEL; - extern const int INSUFFICIENT_NUMBER_OF_COLUMNS; - extern const int CANNOT_CONVERT_TO_FLOAT64; - extern const int FEATURE_BUFFERS_EMPTY; -} - - -class StorageLinRegSink : public SinkToStorage -{ -public: - StorageLinRegSink( - const Block & sample_block_, - String filepath_, - std::shared_ptr model_) - : SinkToStorage(sample_block_) - , sample_block(sample_block_) - , filepath(filepath_) - , model(model_) - { - feature_buffers.resize(sample_block.columns()); - } - - String getName() const override { return "StorageLinRegSink"; } - - void consume(Chunk chunk) override - { - if (!chunk.hasRows()) - return; - - if (chunk.getNumColumns() < 2) - throw Exception( - "there must be at least 2 columns.", - ErrorCodes::INSUFFICIENT_NUMBER_OF_COLUMNS); - - auto& columns = chunk.getColumns(); - - for (size_t i = 0; i < chunk.getNumColumns(); ++i) - { - auto ptr = typeid_cast(columns[i].get()); - if (ptr == nullptr) - throw Exception( - "columns must be float64.", - ErrorCodes::CANNOT_CONVERT_TO_FLOAT64); - - for (size_t j = 0; j < chunk.getNumRows(); ++j) - { - feature_buffers[i].push_back(ptr->getFloat64(j)); - } - } - - } - - void fillFeatureBuffer(const std::vector> & columns, size_t n_rows, size_t n_features, Float64* buffer) const - { - size_t offset = 0; - for (size_t i = 0; i < n_features; ++i) - { - for (size_t j = 0; j < n_rows; ++j) - { - buffer[offset + j] = columns[i][j]; - } - offset += n_rows; - } - } - - void onFinish() override - { - if (feature_buffers.empty()) - throw Exception( - "your podarray vector is empty.", - ErrorCodes::FEATURE_BUFFERS_EMPTY); - - if (feature_buffers.front().empty()) - return; // no data to train on - size_t n_features = feature_buffers.size() - 1; - size_t n_rows = feature_buffers.front().size(); - PODArray feature_vector(n_features * n_rows); - auto * buffer = feature_vector.data(); - - fillFeatureBuffer(feature_buffers, n_rows, n_features, buffer); - - arma::mat regressors(buffer, n_rows, n_features); - regressors = regressors.t(); - - arma::rowvec target; - target = arma::rowvec(feature_buffers.back().data(), n_rows); - - this->model->Train(regressors, target); - - bool save_status = mlpack::data::Save(this->filepath, "sraka", *this->model); - if (!save_status) - throw Exception( - "cannot save model.", - ErrorCodes::CANNOT_SAVE_MLPACK_MODEL); - - } - -private: - Block sample_block; - std::vector> feature_buffers; - // std::vector> memory_buffers; - String filepath; - std::shared_ptr model; -}; - -StorageLinReg::StorageLinReg( - const StorageID & table_id_, - std::unique_ptr linreg_settings_, - const String filepath_, - const ColumnsDescription & columns_, - const ConstraintsDescription & constraints_, - const String & comment) - : IStorage(table_id_) - , filepath(filepath_) - , linreg_settings(std::move(linreg_settings_)) -{ - StorageInMemoryMetadata storage_metadata; // отсавляем как есть - storage_metadata.setColumns(columns_); - storage_metadata.setConstraints(constraints_); - storage_metadata.setComment(comment); - setInMemoryMetadata(storage_metadata); - - // if (linreg_settings== nullptr) - // LOG_FATAL(&Poco::Logger::root(), "\n\n\n\n\n ZHOPA SETTINGS {}\n\n\n\n\n", "status"); - - - model = std::make_shared(); - // if (model== nullptr) - // LOG_FATAL(&Poco::Logger::root(), "\n\n\n\n\n ZHOPA MODEL {}\n\n\n\n\n", "status"); - - model->Lambda() = linreg_settings->lambda.value; -} - -SinkToStoragePtr StorageLinReg::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr) -{ - auto sample_block = metadata_snapshot->getSampleBlock(); - - return std::make_shared( - sample_block, - this->filepath, - this->model); -} - - -LinRegConfiguration StorageLinReg::getConfiguration(ASTs engine_args, ContextPtr context) -{ - LinRegConfiguration configuration; - - if (engine_args.size() != 1) - throw Exception( - "wrong number of parameters.", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - for (auto & engine_arg : engine_args) - engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); - - configuration.filepath = engine_args[0]->as().value.safeGet(); - return configuration; -} - - -void registerStorageLinReg(StorageFactory & factory) -{ - factory.registerStorage("LinReg", [](const StorageFactory::Arguments & args) - { - auto linreg_settings = std::make_unique(); - linreg_settings->loadFromQuery(*args.storage_def); - auto configuration = StorageLinReg::getConfiguration(args.engine_args, args.getLocalContext()); - - return std::make_shared( - args.table_id, - std::move(linreg_settings), - configuration.filepath, - args.columns, - args.constraints, - args.comment); - }, - { - .supports_settings = true, - .source_access_type = AccessType::MLPACK, - }); - -} - -} diff --git a/src/Storages/MLpack/StorageLinReg.h b/src/Storages/MLpack/StorageLinReg.h deleted file mode 100644 index ec385215c965..000000000000 --- a/src/Storages/MLpack/StorageLinReg.h +++ /dev/null @@ -1,46 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -// #include "LinRegSettings.h" - -namespace DB -{ - -struct LinRegConfiguration -{ - String filepath; -}; - -/* Implements storage in the MongoDB database. - * Use ENGINE = mysql(host_port, database_name, table_name, user_name, password) - * Read only. - */ - -class StorageLinReg final : public IStorage -{ -public: - StorageLinReg( - const StorageID & table_id_, - std::unique_ptr linreg_settings_, - const String filepath, - const ColumnsDescription & columns_, - const ConstraintsDescription & constraints_, - const String & comment); - - std::string getName() const override { return "LinReg"; } - - SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; - - static LinRegConfiguration getConfiguration(ASTs engine_args, ContextPtr context); - -private: - const String filepath; - std::unique_ptr linreg_settings; - std::shared_ptr model; -}; - -} diff --git a/src/Storages/MLpack/StorageMLmodel.cpp b/src/Storages/MLpack/StorageMLmodel.cpp index fa210fbee9b6..69c80aeb5c2b 100644 --- a/src/Storages/MLpack/StorageMLmodel.cpp +++ b/src/Storages/MLpack/StorageMLmodel.cpp @@ -98,10 +98,7 @@ class StorageMLmodelSink : public SinkToStorage arma::mat regressors(buffer, n_rows, n_features); regressors = regressors.t(); - if (model == nullptr) - LOG_FATAL(&Poco::Logger::root(), "\n\n\n\n\n ZHOPA MODEL {}\n\n\n\n\n", "status"); - - model->TrainArb(regressors, feature_buffers.back().data(), n_rows); + model->Train(regressors, feature_buffers.back().data(), n_rows); // shady bool save_status = model->Save(this->filepath); @@ -167,9 +164,9 @@ MLmodelConfiguration StorageMLmodel::getConfiguration(ASTs engine_args, ContextP // make for each storage -void registerStorageLinRegg(StorageFactory & factory) +void registerStorageLinReg(StorageFactory & factory) { - factory.registerStorage("LinRegg", [](const StorageFactory::Arguments & args) + factory.registerStorage("LinReg", [](const StorageFactory::Arguments & args) { auto linreg_settings = std::make_unique(); linreg_settings->loadFromQuery(*args.storage_def); diff --git a/src/Storages/MLpack/StorageMLmodel.h b/src/Storages/MLpack/StorageMLmodel.h index f648c90b60c9..e2c19c9d3d19 100644 --- a/src/Storages/MLpack/StorageMLmodel.h +++ b/src/Storages/MLpack/StorageMLmodel.h @@ -6,7 +6,6 @@ #include #include #include -// #include "LinRegSettings.h" namespace DB { diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index c59e7a664603..e25cc05e5eee 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -28,7 +28,6 @@ void registerStorageGenerateRandom(StorageFactory & factory); void registerStorageExecutable(StorageFactory & factory); void registerStorageWindowView(StorageFactory & factory); void registerStorageLinReg(StorageFactory & factory); -void registerStorageLinRegg(StorageFactory & factory); void registerStorageLogReg(StorageFactory & factory); void registerStorageLinearSVM(StorageFactory & factory); @@ -109,7 +108,6 @@ void registerStorages() registerStorageExecutable(factory); registerStorageWindowView(factory); registerStorageLinReg(factory); - registerStorageLinRegg(factory); registerStorageLogReg(factory); registerStorageLinearSVM(factory); From e418323d854918f60a8aaefc860870ef82e189e2 Mon Sep 17 00:00:00 2001 From: antikvist Date: Wed, 18 May 2022 20:09:26 +0000 Subject: [PATCH 07/11] add evaluation --- src/Functions/modelEvaluate.cpp | 27 ++- src/Interpreters/ExternalModelsLoader.cpp | 10 ++ src/Interpreters/ExternalModelsLoader.h | 1 + src/Interpreters/MlpackModel.cpp | 197 ++++++++++++++++++++++ src/Interpreters/MlpackModel.h | 50 ++++++ src/Storages/MLpack/IModel.h | 193 ++++++++++++++++++++- 6 files changed, 469 insertions(+), 9 deletions(-) create mode 100644 src/Interpreters/MlpackModel.cpp create mode 100644 src/Interpreters/MlpackModel.h diff --git a/src/Functions/modelEvaluate.cpp b/src/Functions/modelEvaluate.cpp index 53bb109728ae..c50b18ae9ed3 100644 --- a/src/Functions/modelEvaluate.cpp +++ b/src/Functions/modelEvaluate.cpp @@ -14,7 +14,11 @@ #include #include #include - +#include +#include +#include +#include +#include namespace DB { @@ -99,15 +103,13 @@ class FunctionModelEvaluate final : public IFunction } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t counter_times) const override { const auto * name_col = checkAndGetColumnConst(arguments[0].column.get()); if (!name_col) throw Exception("First argument of function " + getName() + " must be a constant string", ErrorCodes::ILLEGAL_COLUMN); - auto model = models_loader.getModel(name_col->getValue()); - ColumnRawPtrs column_ptrs; Columns materialized_columns; ColumnPtr null_map; @@ -144,7 +146,22 @@ class FunctionModelEvaluate final : public IFunction } } - auto res = model->evaluate(column_ptrs); + + ColumnPtr res; + + const std::string model_name = name_col->getValue(); + LOG_FATAL(&Poco::Logger::root(), "model name {}", model_name); + if (model_name == "lr") { + StoragePtr tableptr = DatabaseCatalog::instance().getTable(StorageID{"default", "testerg"}, CurrentThread::get().getQueryContext()); + std::shared_ptr casted = std::dynamic_pointer_cast(tableptr); + res = casted->getModel()->evaluateModel(column_ptrs); + } + + else + { + auto model = models_loader.getModel(name_col->getValue()); + res = model->evaluate(column_ptrs); + } if (null_map) { diff --git a/src/Interpreters/ExternalModelsLoader.cpp b/src/Interpreters/ExternalModelsLoader.cpp index 317cf0bf1c99..779c895e7f8c 100644 --- a/src/Interpreters/ExternalModelsLoader.cpp +++ b/src/Interpreters/ExternalModelsLoader.cpp @@ -22,6 +22,7 @@ std::shared_ptr ExternalModelsLoader::create( const std::string & config_prefix, const std::string & /* repository_name */) const { String type = config.getString(config_prefix + ".type"); + LOG_FATAL(&Poco::Logger::root(), "type {}", type); ExternalLoadableLifetime lifetime(config, config_prefix + ".lifetime"); /// TODO: add models factory. @@ -33,6 +34,15 @@ std::shared_ptr ExternalModelsLoader::create( lifetime ); } + else if (type == "mlpack") + { + return std::make_unique( + name, + config.getString(config_prefix + ".path"), + config.getString(config_prefix + ".method"), + lifetime + ); + } else { throw Exception("Unknown model type: " + type, ErrorCodes::INVALID_CONFIG_PARAMETER); diff --git a/src/Interpreters/ExternalModelsLoader.h b/src/Interpreters/ExternalModelsLoader.h index 0eeb60008c3e..5aaf6e16fe78 100644 --- a/src/Interpreters/ExternalModelsLoader.h +++ b/src/Interpreters/ExternalModelsLoader.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include diff --git a/src/Interpreters/MlpackModel.cpp b/src/Interpreters/MlpackModel.cpp new file mode 100644 index 000000000000..99b161ca2fce --- /dev/null +++ b/src/Interpreters/MlpackModel.cpp @@ -0,0 +1,197 @@ +#include "MlpackModel.h" +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +//#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +extern const int BAD_ARGUMENTS; +extern const int CANNOT_LOAD_CATBOOST_MODEL; +extern const int CANNOT_APPLY_CATBOOST_MODEL; +} + +class MlpackModelImpl +{ +public: + MlpackModelImpl(const std::string & model_path_, const std::string & method_) : model_path(model_path_), method(method_) + {} + + ColumnPtr evaluate(const ColumnRawPtrs & columns) const + { + if (columns.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Got empty columns list for Mlpack model."); + + for (size_t i = 0; i < columns.size(); ++i) + { + if (!columns[i]->isNumeric()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Column {} should be numeric to make float feature.", i); + } + } + + auto result = evalImpl(columns); + return result; + } + +private: + std::string model_path; + std::string method; + + /// Buffer should be allocated with features_count * column->size() elements. + template + void placeColumnAsNumber(const IColumn * column, T * buffer, size_t offset) const + { + size_t size = column->size(); + FieldVisitorConvertToNumber visitor; + for (size_t i = 0; i < size; ++i) + { + /// TODO: Replace with column visitor. + Field field; + column->get(i, field); + buffer[offset + i] = applyVisitor(visitor, field); + } + } + + /// Place columns into buffer, returns column which holds placed data. Buffer should contains column->size() values. + template + void placeNumericColumns(const ColumnRawPtrs & columns, + size_t column_size, size_t columns_amount, double* buffer) const + { + if (column_size == 0) + return; + + size_t result_offset = 0; + + for (size_t i = 0; i < columns_amount; ++i) + { + const auto * column = columns[i]; + if (column->isNumeric()) + { + placeColumnAsNumber(column, buffer, result_offset); + result_offset += column_size; + } + } + } + + ColumnFloat64::MutablePtr evalImpl(const ColumnRawPtrs & columns) const + { + size_t columns_amount = columns.size(); + size_t column_size = columns.front()->size(); + + if (column_size == 0 || columns_amount == 0) + { + return ColumnFloat64::create(column_size); + } + + // double *features_buffer = new double[columns_amount * column_size]; + PODArray float_features(columns_amount * column_size); + auto * features_buffer = float_features.data(); + + placeNumericColumns(columns, column_size, columns_amount, features_buffer); + + arma::mat regressors(features_buffer, column_size, columns_amount); + regressors = regressors.t(); + + auto result = ColumnFloat64::create(column_size); + auto * result_buf = result->getData().data(); + + if (method == "linear") + { + mlpack::regression::LinearRegression lr; + bool load_St = mlpack::data::Load(model_path, "model", lr); + if (!load_St) + throw Exception("Unable to parse Mlpack model from file.", ErrorCodes::CANNOT_LOAD_CATBOOST_MODEL); + + + arma::rowvec answers(column_size); + lr.Predict(regressors, answers); + + for (size_t i = 0; i < column_size; i++) + { + result_buf[i] = answers(i); + } + } + if (method == "logistic") + { + mlpack::regression::LogisticRegression lr; + bool load_st = mlpack::data::Load(model_path, "model", lr); + if (!load_st) + throw Exception("Unable to parse Mlpack model from file.", ErrorCodes::CANNOT_LOAD_CATBOOST_MODEL); + + arma::Row answers(column_size); + lr.Classify(regressors, answers); + + for (size_t i = 0; i < column_size; i ++) + { + result_buf[i] = answers(i); + } + } + + if (method == "svm") + { + mlpack::svm::LinearSVM lr; + bool load_st = mlpack::data::Load(model_path, "model", lr); + if (!load_st) + throw Exception("Unable to parse Mlpack model from file.", ErrorCodes::CANNOT_LOAD_CATBOOST_MODEL); + + arma::Row answers(column_size); + lr.Classify(regressors, answers); + + for (size_t i = 0; i < column_size; i ++) + { + result_buf[i] = answers(i); + } + } + + return result; + } +}; + + +MlpackModel::MlpackModel(std::string name_, std::string model_path_, + std::string method_, + const ExternalLoadableLifetime & lifetime_) + : name(std::move(name_)), model_path(std::move(model_path_)), method(std::move(method_)), lifetime(lifetime_) +{ + model = std::make_unique(model_path, method); +} + +MlpackModel::~MlpackModel() = default; + +DataTypePtr MlpackModel::getReturnType() const +{ + auto type = std::make_shared(); + return type; +} + +ColumnPtr MlpackModel::evaluate(const ColumnRawPtrs & columns) const +{ + if (!model) + throw Exception("Mlpack model was not loaded.", ErrorCodes::LOGICAL_ERROR); + + return model->evaluate(columns); +} + +} diff --git a/src/Interpreters/MlpackModel.h b/src/Interpreters/MlpackModel.h new file mode 100644 index 000000000000..c21eba86dc86 --- /dev/null +++ b/src/Interpreters/MlpackModel.h @@ -0,0 +1,50 @@ +#include +#include +#include +#include "CatBoostModel.h" + +namespace DB +{ + +class MlpackModelImpl; + +class MlpackModel : public IMLModel +{ +public: + MlpackModel(std::string name, std::string model_path, + std::string method, + const ExternalLoadableLifetime & lifetime); + + ~MlpackModel() override; + + ColumnPtr evaluate(const ColumnRawPtrs & columns) const override; + std::string getTypeName() const override { return "mlpack"; } + + DataTypePtr getReturnType() const override; + + const ExternalLoadableLifetime & getLifetime() const override { return lifetime; } + + std::string getLoadableName() const override { return name; } + + bool supportUpdates() const override { return true; } + + bool isModified() const override { return true; } + + std::shared_ptr clone() const override + { + return std::make_shared(name, model_path, method, lifetime); + } + +private: + const std::string name; + const std::string method; + std::string model_path; + ExternalLoadableLifetime lifetime; + + std::unique_ptr model; + + void init(); +}; + + +} diff --git a/src/Storages/MLpack/IModel.h b/src/Storages/MLpack/IModel.h index 90304e757420..c87743413a4e 100644 --- a/src/Storages/MLpack/IModel.h +++ b/src/Storages/MLpack/IModel.h @@ -1,5 +1,12 @@ #pragma once +// #include +// #include +// #include +// #include +// #include +// #include +// #include #include #include #include @@ -9,6 +16,28 @@ #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +extern const int BAD_ARGUMENTS; +extern const int CANNOT_LOAD_CATBOOST_MODEL; +extern const int CANNOT_APPLY_CATBOOST_MODEL; +} namespace DB { @@ -41,10 +70,80 @@ class IModel virtual TrainResult GetModel() const = 0; - virtual void Train(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) = 0; + virtual void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) = 0; virtual bool Save(String filepath) = 0; + virtual ColumnPtr evaluateModel(const ColumnRawPtrs & columns) = 0; + +protected: + void checkColumns(const ColumnRawPtrs & columns) + { + if (columns.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Got empty columns list for Mlpack model."); + + for (size_t i = 0; i < columns.size(); ++i) + { + if (!columns[i]->isNumeric()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Column {} should be numeric to make float feature.", i); + } + } + } + + arma::mat prepareTestDataset(const ColumnRawPtrs & columns) + { + size_t columns_amount = columns.size(); + size_t column_size = columns.front()->size(); + + PODArray float_features(columns_amount * column_size); + auto * features_buffer = float_features.data(); + + placeNumericColumns(columns, column_size, columns_amount, features_buffer); + + arma::mat regressors(features_buffer, column_size, columns_amount); + regressors = regressors.t(); + + return regressors; + } + +private: + /// Buffer should be allocated with features_count * column->size() elements. + template + void placeColumnAsNumber(const IColumn * column, T * buffer, size_t offset) const + { + size_t size = column->size(); + FieldVisitorConvertToNumber visitor; + for (size_t i = 0; i < size; ++i) + { + /// TODO: Replace with column visitor. + Field field; + column->get(i, field); + buffer[offset + i] = applyVisitor(visitor, field); + } + } + + /// Place columns into buffer, returns column which holds placed data. Buffer should contains column->size() values. + template + void placeNumericColumns(const ColumnRawPtrs & columns, + size_t column_size, size_t columns_amount, double* buffer) const + { + if (column_size == 0) + return; + + size_t result_offset = 0; + + for (size_t i = 0; i < columns_amount; ++i) + { + const auto * column = columns[i]; + if (column->isNumeric()) + { + placeColumnAsNumber(column, buffer, result_offset); + result_offset += column_size; + } + } + } + }; class LinReg: public IModel @@ -61,7 +160,7 @@ class LinReg: public IModel TrainResult GetModel() const override { return LinearRegressionResult{{}, model}; } - void Train(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override + void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override { arma::rowvec target = arma::rowvec(target_ptr, n_rows); model.Train(regressors, target); @@ -72,6 +171,31 @@ class LinReg: public IModel return mlpack::data::Save(filepath, "linreg", model); } + ColumnPtr evaluateModel(const ColumnRawPtrs & columns) override + { + checkColumns(columns); + size_t column_size = columns.front()->size(); + + if (column_size == 0) { + return ColumnFloat64::create(column_size); + } + + auto result = ColumnFloat64::create(column_size); + auto * result_buf = result->getData().data(); + + arma::mat regressors = prepareTestDataset(columns); + + arma::rowvec answers(column_size); + model.Predict(regressors, answers); + + for (size_t i = 0; i < column_size; i++) + { + result_buf[i] = answers(i); + } + + return result; + } + protected: mlpack::regression::LinearRegression model; }; @@ -91,7 +215,13 @@ class LogReg: public IModel TrainResult GetModel() const override { return LogisticRegressionResult{{}, model}; } - void Train(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override { + // void Train(const arma::mat& regressors, const arma::Row& target) override { + // ens::L_BFGS lbfgsOpt; + + // model.Train(regressors, target, lbfgsOpt); + // } + + void TrainArb(const arma::mat& regressors, Float64 * target_ptr, size_t n_rows) override { PODArray modified_target(n_rows); for (size_t i = 0; i < n_rows; ++i) { @@ -107,6 +237,31 @@ class LogReg: public IModel return mlpack::data::Save(filepath, "logreg", model); } + ColumnPtr evaluateModel(const ColumnRawPtrs & columns) override + { + checkColumns(columns); + size_t column_size = columns.front()->size(); + + if (column_size == 0) { + return ColumnFloat64::create(column_size); + } + + auto result = ColumnFloat64::create(column_size); + auto * result_buf = result->getData().data(); + + arma::mat regressors = prepareTestDataset(columns); + + arma::rowvec answers(column_size); + model.Classify(regressors, answers); + + for (size_t i = 0; i < column_size; i++) + { + result_buf[i] = answers(i); + } + + return result; + } + protected: mlpack::regression::LogisticRegression<> model; @@ -127,7 +282,12 @@ class LinearSVM: public IModel TrainResult GetModel() const override { return LinearSVMResult{{}, model}; } - void Train(const arma::mat& regressors, Float64 * /*target_ptr*/, size_t n_rows) override + // void Train(const arma::mat& regressors, const arma::Row& target) override { + // ens::L_BFGS lbfgsOpt; + // model.Train(regressors, target, 2, lbfgsOpt); + // } + + void TrainArb(const arma::mat& regressors, Float64 * /*target_ptr*/, size_t n_rows) override { // PODArray modified_target; // modified_target.resize(n_rows); @@ -152,6 +312,31 @@ class LinearSVM: public IModel return mlpack::data::Save(filepath, "linearsvm", model); } + ColumnPtr evaluateModel(const ColumnRawPtrs & columns) override + { + checkColumns(columns); + size_t column_size = columns.front()->size(); + + if (column_size == 0) { + return ColumnFloat64::create(column_size); + } + + auto result = ColumnFloat64::create(column_size); + auto * result_buf = result->getData().data(); + + arma::mat regressors = prepareTestDataset(columns); + + arma::rowvec answers(column_size); + model.Classify(regressors, answers); + + for (size_t i = 0; i < column_size; i++) + { + result_buf[i] = answers(i); + } + + return result; + } + protected: mlpack::svm::LinearSVM<> model; }; From 8af8242a6cf62719290e27100888bbd23be0aa0f Mon Sep 17 00:00:00 2001 From: antikvist Date: Thu, 2 Jun 2022 20:27:34 +0000 Subject: [PATCH 08/11] refactor test --- CMakeLists.txt | 3 --- src/Functions/modelEvaluate.cpp | 16 ++++++++++------ src/Storages/MLpack/StorageMLmodel.cpp | 2 +- src/Storages/MLpack/StorageMLmodel.h | 3 +-- .../_0295_check_mlpack_linreg.reference | 1 + .../0_stateless/_0295_check_mlpack_linreg.sql | 5 +++++ 6 files changed, 18 insertions(+), 12 deletions(-) create mode 100644 tests/queries/0_stateless/_0295_check_mlpack_linreg.reference create mode 100644 tests/queries/0_stateless/_0295_check_mlpack_linreg.sql diff --git a/CMakeLists.txt b/CMakeLists.txt index 639d0987d358..82a2f4a74857 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -412,16 +412,13 @@ elseif (OS_FREEBSD) endif () link_libraries(global-group) -<<<<<<< HEAD set (WERROR OFF) -======= if (NOT (OS_LINUX OR OS_DARWIN)) # Using system libs can cause a lot of warnings in includes (on macro expansion). option(WERROR "Enable -Werror compiler option" OFF) else () option(WERROR "Enable -Werror compiler option" ON) endif () ->>>>>>> upstream/master if (WERROR) # Don't pollute CMAKE_CXX_FLAGS with -Werror as it will break some CMake checks. diff --git a/src/Functions/modelEvaluate.cpp b/src/Functions/modelEvaluate.cpp index c50b18ae9ed3..7bfd9b85f5b1 100644 --- a/src/Functions/modelEvaluate.cpp +++ b/src/Functions/modelEvaluate.cpp @@ -82,7 +82,11 @@ class FunctionModelEvaluate final : public IFunction for (size_t i = 1; i < arguments.size(); ++i) has_nullable = has_nullable || arguments[i].type->isNullable(); - auto model = models_loader.getModel(name_col->getValue()); + const std::string model_name = name_col->getValue(); + if (model_name.length() > 7 && model_name.rfind("mlpack", 0) == 0) { + return std::make_shared(); + } + auto model = models_loader.getModel(model_name); auto type = model->getReturnType(); if (has_nullable) @@ -103,7 +107,7 @@ class FunctionModelEvaluate final : public IFunction } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t counter_times) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override { const auto * name_col = checkAndGetColumnConst(arguments[0].column.get()); if (!name_col) @@ -148,11 +152,11 @@ class FunctionModelEvaluate final : public IFunction ColumnPtr res; - const std::string model_name = name_col->getValue(); - LOG_FATAL(&Poco::Logger::root(), "model name {}", model_name); - if (model_name == "lr") { - StoragePtr tableptr = DatabaseCatalog::instance().getTable(StorageID{"default", "testerg"}, CurrentThread::get().getQueryContext()); + + if (model_name.length() > 7 && model_name.rfind("mlpack", 0) == 0) { + const std::string table_name = model_name.substr(7); + StoragePtr tableptr = DatabaseCatalog::instance().getTable(StorageID{"default", table_name}, CurrentThread::get().getQueryContext()); std::shared_ptr casted = std::dynamic_pointer_cast(tableptr); res = casted->getModel()->evaluateModel(column_ptrs); } diff --git a/src/Storages/MLpack/StorageMLmodel.cpp b/src/Storages/MLpack/StorageMLmodel.cpp index 69c80aeb5c2b..3395ca5aeb2d 100644 --- a/src/Storages/MLpack/StorageMLmodel.cpp +++ b/src/Storages/MLpack/StorageMLmodel.cpp @@ -98,7 +98,7 @@ class StorageMLmodelSink : public SinkToStorage arma::mat regressors(buffer, n_rows, n_features); regressors = regressors.t(); - model->Train(regressors, feature_buffers.back().data(), n_rows); + model->TrainArb(regressors, feature_buffers.back().data(), n_rows); // shady bool save_status = model->Save(this->filepath); diff --git a/src/Storages/MLpack/StorageMLmodel.h b/src/Storages/MLpack/StorageMLmodel.h index e2c19c9d3d19..b9f8870c2c5a 100644 --- a/src/Storages/MLpack/StorageMLmodel.h +++ b/src/Storages/MLpack/StorageMLmodel.h @@ -4,7 +4,6 @@ #include #include #include -#include #include namespace DB @@ -34,7 +33,7 @@ class StorageMLmodel final : public IStorage std::string getName() const override { return "MLmodel"; } // maybe check on null? - TrainResult getModel() const { return model->GetModel(); } + IModelPtr getModel() const { return model; } // check on null? String getModelName() const { return model->GetName(); } diff --git a/tests/queries/0_stateless/_0295_check_mlpack_linreg.reference b/tests/queries/0_stateless/_0295_check_mlpack_linreg.reference new file mode 100644 index 000000000000..195d87d193d9 --- /dev/null +++ b/tests/queries/0_stateless/_0295_check_mlpack_linreg.reference @@ -0,0 +1 @@ +1.0000000000000004 diff --git a/tests/queries/0_stateless/_0295_check_mlpack_linreg.sql b/tests/queries/0_stateless/_0295_check_mlpack_linreg.sql new file mode 100644 index 000000000000..3e68de7b3f07 --- /dev/null +++ b/tests/queries/0_stateless/_0295_check_mlpack_linreg.sql @@ -0,0 +1,5 @@ +CREATE TABLE testlinreg1 (x1 Float64, x2 Float64, x3 Float64, y Float64) engine=LinReg('testlinreg1.xml') settings lambda=0.0; +INSERT INTO testlinreg1 (*) values (1, 1, 1, 1); +create table checklinregansw1 (x1 Float64, x2 Float64, x3 Float64) engine=Memory; +INSERT INTO checklinregansw1 (*) values (1, 1, 1); +SELECT modelEvaluate('mlpack testlinreg1', x1, x2, x3) FROM checklinregansw1 From 305234ece0be78e16e3d54cf7cfa6fde03bee2dd Mon Sep 17 00:00:00 2001 From: sabinadayanova Date: Fri, 3 Jun 2022 15:24:04 +0000 Subject: [PATCH 09/11] minor fix --- CMakeLists.txt | 3 --- src/Interpreters/_kek.cpp | 21 --------------------- 2 files changed, 24 deletions(-) delete mode 100644 src/Interpreters/_kek.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 639d0987d358..82a2f4a74857 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -412,16 +412,13 @@ elseif (OS_FREEBSD) endif () link_libraries(global-group) -<<<<<<< HEAD set (WERROR OFF) -======= if (NOT (OS_LINUX OR OS_DARWIN)) # Using system libs can cause a lot of warnings in includes (on macro expansion). option(WERROR "Enable -Werror compiler option" OFF) else () option(WERROR "Enable -Werror compiler option" ON) endif () ->>>>>>> upstream/master if (WERROR) # Don't pollute CMAKE_CXX_FLAGS with -Werror as it will break some CMake checks. diff --git a/src/Interpreters/_kek.cpp b/src/Interpreters/_kek.cpp deleted file mode 100644 index 60efbb0e65ab..000000000000 --- a/src/Interpreters/_kek.cpp +++ /dev/null @@ -1,21 +0,0 @@ -#include -#include -#include - -void popa() { - arma::mat regressors({1.0, 2.0, 3.0}); - arma::rowvec responses({1.0, 4.0, 9.0}); - auto lr = mlpack::regression::LinearRegression(regressors, responses); - arma::mat testX({2.0}); - arma::rowvec testY; - lr.Predict(testX, testY); - // std::cout << testY << std::endl; - - // bool status = mlpack::data::Save("zhopa.bin", "sraka", lr); - - LOG_FATAL(&Poco::Logger::root(), "AOOAOAOOAAOO {}", testY); - // mlpack::regression::LinearRegression lr; - // bool status = mlpack::data::Load("zhopa.bin", "sraka", lr); - // lr.Predict(testX, testY); - // std::cout << status << std::endl; -} From 61e72c165205b9e24d1a120343355c3b54585f1b Mon Sep 17 00:00:00 2001 From: antikvist Date: Mon, 6 Jun 2022 19:38:41 +0000 Subject: [PATCH 10/11] add tests for logreg --- src/Storages/MLpack/IModel.h | 4 ++-- .../queries/0_stateless/_0296_check_mlpack_logreg.reference | 6 ++++++ tests/queries/0_stateless/_0296_check_mlpack_logreg.sql | 5 +++++ .../0_stateless/_0297_check_mlpack_linearsvm.reference | 1 + tests/queries/0_stateless/_0297_check_mlpack_linearsvm.sql | 5 +++++ 5 files changed, 19 insertions(+), 2 deletions(-) create mode 100644 tests/queries/0_stateless/_0296_check_mlpack_logreg.reference create mode 100644 tests/queries/0_stateless/_0296_check_mlpack_logreg.sql create mode 100644 tests/queries/0_stateless/_0297_check_mlpack_linearsvm.reference create mode 100644 tests/queries/0_stateless/_0297_check_mlpack_linearsvm.sql diff --git a/src/Storages/MLpack/IModel.h b/src/Storages/MLpack/IModel.h index c87743413a4e..4e0a17befa88 100644 --- a/src/Storages/MLpack/IModel.h +++ b/src/Storages/MLpack/IModel.h @@ -251,7 +251,7 @@ class LogReg: public IModel arma::mat regressors = prepareTestDataset(columns); - arma::rowvec answers(column_size); + arma::Row answers(column_size); model.Classify(regressors, answers); for (size_t i = 0; i < column_size; i++) @@ -326,7 +326,7 @@ class LinearSVM: public IModel arma::mat regressors = prepareTestDataset(columns); - arma::rowvec answers(column_size); + arma::Row answers(column_size); model.Classify(regressors, answers); for (size_t i = 0; i < column_size; i++) diff --git a/tests/queries/0_stateless/_0296_check_mlpack_logreg.reference b/tests/queries/0_stateless/_0296_check_mlpack_logreg.reference new file mode 100644 index 000000000000..b6d110f42026 --- /dev/null +++ b/tests/queries/0_stateless/_0296_check_mlpack_logreg.reference @@ -0,0 +1,6 @@ +1 +CREATE TABLE testlinreg1 (x1 Float64, x2 Float64, x3 Float64, y Float64) engine=LinReg('testlinreg1.xml') settings lambda=0.0; +INSERT INTO testlinreg1 (*) values (1, 1, 1, 1); +create table checklinregansw1 (x1 Float64, x2 Float64, x3 Float64) engine=Memory; +INSERT INTO checklinregansw1 (*) values (1, 1, 1); +SELECT modelEvaluate('mlpack testlinreg1', x1, x2, x3) FROM checklinregansw1 diff --git a/tests/queries/0_stateless/_0296_check_mlpack_logreg.sql b/tests/queries/0_stateless/_0296_check_mlpack_logreg.sql new file mode 100644 index 000000000000..a091dbc3f2a2 --- /dev/null +++ b/tests/queries/0_stateless/_0296_check_mlpack_logreg.sql @@ -0,0 +1,5 @@ +CREATE TABLE testlogreg1 (x1 Float64, x2 Float64, x3 Float64, y Float64) engine=LinReg('testlogreg1.xml'); +INSERT INTO testlogreg1 (*) values (1, 1, 1, 1); +create table checklogregansw1 (x1 Float64, x2 Float64, x3 Float64) engine=Memory; +INSERT INTO checklogregansw1 (*) values (1, 1, 1); +SELECT modelEvaluate('mlpack testlogreg1', x1, x2, x3) FROM checklogregansw1 diff --git a/tests/queries/0_stateless/_0297_check_mlpack_linearsvm.reference b/tests/queries/0_stateless/_0297_check_mlpack_linearsvm.reference new file mode 100644 index 000000000000..d00491fd7e5b --- /dev/null +++ b/tests/queries/0_stateless/_0297_check_mlpack_linearsvm.reference @@ -0,0 +1 @@ +1 diff --git a/tests/queries/0_stateless/_0297_check_mlpack_linearsvm.sql b/tests/queries/0_stateless/_0297_check_mlpack_linearsvm.sql new file mode 100644 index 000000000000..0ff6c094a933 --- /dev/null +++ b/tests/queries/0_stateless/_0297_check_mlpack_linearsvm.sql @@ -0,0 +1,5 @@ +CREATE TABLE testsvm1 (x1 Float64, x2 Float64, x3 Float64, y Float64) engine=LinReg('testsvm1.xml'); +INSERT INTO testsvm1 (*) values (1, 1, 1, 1); +create table checksvmansw1 (x1 Float64, x2 Float64, x3 Float64) engine=Memory; +INSERT INTO checksvmansw1 (*) values (1, 1, 1); +SELECT modelEvaluate('mlpack testsvm1', x1, x2, x3) FROM checksvmansw1 From 5ed7af8e04f45ac988e8b76379c7331d2e62d323 Mon Sep 17 00:00:00 2001 From: antikvist Date: Mon, 6 Jun 2022 19:49:02 +0000 Subject: [PATCH 11/11] integration --- .../_test_mlpack_model_evaluate/__init__.py | 0 .../config/models_config.xml | 3 ++ .../model/model.xml | 18 +++++++ .../model/model_config.xml | 9 ++++ .../_test_mlpack_model_evaluate/test.py | 48 +++++++++++++++++++ 5 files changed, 78 insertions(+) create mode 100644 tests/integration/_test_mlpack_model_evaluate/__init__.py create mode 100644 tests/integration/_test_mlpack_model_evaluate/config/models_config.xml create mode 100644 tests/integration/_test_mlpack_model_evaluate/model/model.xml create mode 100644 tests/integration/_test_mlpack_model_evaluate/model/model_config.xml create mode 100644 tests/integration/_test_mlpack_model_evaluate/test.py diff --git a/tests/integration/_test_mlpack_model_evaluate/__init__.py b/tests/integration/_test_mlpack_model_evaluate/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/_test_mlpack_model_evaluate/config/models_config.xml b/tests/integration/_test_mlpack_model_evaluate/config/models_config.xml new file mode 100644 index 000000000000..84378df0e8f8 --- /dev/null +++ b/tests/integration/_test_mlpack_model_evaluate/config/models_config.xml @@ -0,0 +1,3 @@ + + /etc/clickhouse-server/model/model_config.xml + diff --git a/tests/integration/_test_mlpack_model_evaluate/model/model.xml b/tests/integration/_test_mlpack_model_evaluate/model/model.xml new file mode 100644 index 000000000000..a324fcbf5876 --- /dev/null +++ b/tests/integration/_test_mlpack_model_evaluate/model/model.xml @@ -0,0 +1,18 @@ + + + + + + 4 + 1 + 4 + 1 + 2.50000000000000111e-01 + 2.50000000000000056e-01 + 2.50000000000000056e-01 + 2.50000000000000056e-01 + + 0.00000000000000000e+00 + 1 + + \ No newline at end of file diff --git a/tests/integration/_test_mlpack_model_evaluate/model/model_config.xml b/tests/integration/_test_mlpack_model_evaluate/model/model_config.xml new file mode 100644 index 000000000000..265a2656270e --- /dev/null +++ b/tests/integration/_test_mlpack_model_evaluate/model/model_config.xml @@ -0,0 +1,9 @@ + + + mlpack + lr + linear + /etc/clickhouse-server/model/model.xml + 0 + + diff --git a/tests/integration/_test_mlpack_model_evaluate/test.py b/tests/integration/_test_mlpack_model_evaluate/test.py new file mode 100644 index 000000000000..b10fabb1dae2 --- /dev/null +++ b/tests/integration/_test_mlpack_model_evaluate/test.py @@ -0,0 +1,48 @@ +import os +import sys +import time + +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node = cluster.add_instance( + "node", stay_alive=True, main_configs=["config/models_config.xml"] +) + + +def copy_file_to_container(local_path, dist_path, container_id): + os.system( + "docker cp {local} {cont_id}:{dist}".format( + local=local_path, cont_id=container_id, dist=dist_path + ) + ) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + copy_file_to_container( + os.path.join(SCRIPT_DIR, "model/."), + "/etc/clickhouse-server/model", + node.docker_id, + ) + node.restart_clickhouse() + + yield cluster + + finally: + cluster.shutdown() + + +def test(started_cluster): + if node.is_built_with_memory_sanitizer(): + pytest.skip("Memory Sanitizer cannot work with third-party shared libraries") + + node.query("select modelEvaluate('lr', 1, 1, 1);")