diff --git a/components/bulk_load/CMakeLists.txt b/components/bulk_load/CMakeLists.txt new file mode 100644 index 000000000000..09bea9a0c012 --- /dev/null +++ b/components/bulk_load/CMakeLists.txt @@ -0,0 +1,42 @@ +# Copyright (c) 2024, Percona and/or its affiliates. + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License, version 2.0, +# as published by the Free Software Foundation. + +# This program is also distributed with certain software (including +# but not limited to OpenSSL) that is licensed under separate terms, +# as designated in a particular file or component or in included license +# documentation. The authors of MySQL hereby grant you an additional +# permission to link the program and your derivative works with the +# separately licensed software that they have included with MySQL. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License, version 2.0, for more details. + +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +ADD_DEFINITIONS(-DLOG_COMPONENT_TAG="bulk_load") + +MYSQL_ADD_COMPONENT(bulk_load + bulk_load_component.cc + bulk_loader.cc + bulk_loader/bulk_loader_abstract.cc + bulk_loader/bulk_loader_local.cc + bulk_loader/bulk_loader_s3.cc + data_stream/stream_local/data_stream_local.cc + data_stream/stream_s3/data_stream_s3.cc + stream_parser/stream_parser.cc + LINK_LIBRARIES extra::rapidjson + MODULE_ONLY +) + +target_include_directories( + component_bulk_load + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} +) diff --git a/components/bulk_load/bulk_load_component.cc b/components/bulk_load/bulk_load_component.cc new file mode 100644 index 000000000000..ab2c5fc38893 --- /dev/null +++ b/components/bulk_load/bulk_load_component.cc @@ -0,0 +1,126 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_load_component.h" + +#include + +#include "mysql/components/service_implementation.h" +#include "mysql/components/services/log_builtins.h" + +#include "bulk_loader.h" +#include "bulk_loader/bulk_loader_abstract.h" + +REQUIRES_SERVICE_PLACEHOLDER(log_builtins); +REQUIRES_SERVICE_PLACEHOLDER(log_builtins_string); + +SERVICE_TYPE(log_builtins) *log_bi; +SERVICE_TYPE(log_builtins_string) *log_bs; + +namespace Bulk_load { + +DEFINE_METHOD(Bulk_loader *, create_bulk_loader, + (THD *thd, my_thread_id connection_id, const TABLE *table, + Bulk_source src, const CHARSET_INFO *charset)) { + return Bulk_load::create_loader(thd, connection_id, table, src, charset); +} + +DEFINE_METHOD(void, set_string, + (Bulk_loader *loader, Bulk_string type, std::string value)) { + static_cast(loader)->set_string(type, std::move(value)); +} + +DEFINE_METHOD(void, set_char, + (Bulk_loader *loader, Bulk_char type, unsigned char value)) { + static_cast(loader)->set_char(type, value); +} + +DEFINE_METHOD(void, set_size, + (Bulk_loader *loader, Bulk_size type, size_t value)) { + static_cast(loader)->set_size(type, value); +} + +DEFINE_METHOD(void, set_condition, + (Bulk_loader *loader, Bulk_condition type, bool value)) { + static_cast(loader)->set_condition(type, value); +} + +DEFINE_METHOD(void, set_compression_algorithm, + (Bulk_loader *loader, Bulk_compression_algorithm algorithm)) { + static_cast(loader)->set_compression_algorithm(algorithm); +} + +DEFINE_METHOD(bool, load, (Bulk_loader *loader, size_t &affected_rows)) { + return static_cast(loader)->load(affected_rows); +} + +DEFINE_METHOD(void, drop_bulk_loader, (THD *thd, Bulk_loader *loader)) { + Bulk_load::drop_loader(thd, static_cast(loader)); +} + +} // namespace Bulk_load + +static mysql_service_status_t component_init() { + log_bi = mysql_service_log_builtins; + log_bs = mysql_service_log_builtins_string; + + return 0; +} + +static mysql_service_status_t component_deinit() { + return 0; +} + +// clang-format off +BEGIN_SERVICE_IMPLEMENTATION(component_bulk_load, bulk_load_driver) +Bulk_load::create_bulk_loader, Bulk_load::set_string, + Bulk_load::set_char, Bulk_load::set_size, + Bulk_load::set_condition, Bulk_load::set_compression_algorithm, + Bulk_load::load, Bulk_load::drop_bulk_loader, + END_SERVICE_IMPLEMENTATION(); + +BEGIN_COMPONENT_PROVIDES(component_bulk_load) +PROVIDES_SERVICE(component_bulk_load, bulk_load_driver), +// PROVIDES_SERVICE(component_bulk_load, log_builtins), +// PROVIDES_SERVICE(component_bulk_load, log_builtins_string), + END_COMPONENT_PROVIDES(); + +BEGIN_COMPONENT_REQUIRES(component_bulk_load) +REQUIRES_SERVICE(registry), REQUIRES_SERVICE(log_builtins), + REQUIRES_SERVICE(log_builtins_string), + // REQUIRES_SERVICE(bulk_data_load), + //REQUIRES_SERVICE(bulk_data_convert), + END_COMPONENT_REQUIRES(); + +BEGIN_COMPONENT_METADATA(component_bulk_load) + METADATA("mysql.author", "Percona Corporation"), + METADATA("mysql.license", "GPL"), +END_COMPONENT_METADATA(); + +DECLARE_COMPONENT(component_bulk_load, "component_bulk_load") + component_init, component_deinit, + END_DECLARE_COMPONENT(); + +DECLARE_LIBRARY_COMPONENTS &COMPONENT_REF(component_bulk_load) + END_DECLARE_LIBRARY_COMPONENTS + +// clang-format on diff --git a/components/bulk_load/bulk_load_component.h b/components/bulk_load/bulk_load_component.h new file mode 100644 index 000000000000..44bbad9da74c --- /dev/null +++ b/components/bulk_load/bulk_load_component.h @@ -0,0 +1,60 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include +#include +#include + +#include "bulk_loader/bulk_loader_abstract.h" + +extern REQUIRES_SERVICE_PLACEHOLDER(registry); +//extern REQUIRES_SERVICE_PLACEHOLDER(bulk_data_load); +//extern REQUIRES_SERVICE_PLACEHOLDER(bulk_data_convert); + +namespace Bulk_load { + +DEFINE_METHOD(Bulk_loader *, create_bulk_loader, + (THD *thd, my_thread_id connection_id, const TABLE *table, + Bulk_source src, const CHARSET_INFO *charset)); + +DEFINE_METHOD(void, set_string, + (Bulk_loader_impl *loader, Bulk_string type, std::string value)); + +DEFINE_METHOD(void, set_char, + (Bulk_loader_impl *loader, Bulk_char type, unsigned char value)); + +DEFINE_METHOD(void, set_size, + (Bulk_loader_impl *loader, Bulk_size type, size_t value)); + +DEFINE_METHOD(void, set_condition, + (Bulk_loader_impl *loader, Bulk_condition type, bool value)); + +DEFINE_METHOD(void, set_compression_algorithm, + (Bulk_loader_impl *loader, Bulk_compression_algorithm algorithm)); + +DEFINE_METHOD(bool, load, (Bulk_loader_impl *loader, size_t &affected_rows)); + +DEFINE_METHOD(void, drop_bulk_loader, (THD *thd, Bulk_loader_impl *loader)); + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader.cc b/components/bulk_load/bulk_loader.cc new file mode 100644 index 000000000000..b82fa5a7ea6b --- /dev/null +++ b/components/bulk_load/bulk_loader.cc @@ -0,0 +1,58 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_loader.h" + +#include "bulk_loader/bulk_loader_local.h" +#include "bulk_loader/bulk_loader_s3.h" + +namespace Bulk_load { + +template +Bulk_loader_impl *create_helper(THD *thd, my_thread_id connection_id, + const TABLE *table, + const CHARSET_INFO *charset) { + return new Bulk_loader_base(thd, connection_id, table, charset); +} + +Bulk_loader_impl *create_loader(THD *thd, my_thread_id connection_id, + const TABLE *table, Bulk_source src, + const CHARSET_INFO *charset) noexcept { + if (src == Bulk_source::OCI) { + return nullptr; // not supported + } + + using CreateFunc = Bulk_loader_impl *(*)(THD *, my_thread_id, const TABLE *, + const CHARSET_INFO *); + static const CreateFunc funcs[static_cast(Bulk_source::S3) + 1] = { + create_helper, + nullptr, + create_helper}; + return (*funcs[static_cast(src)])(thd, connection_id, table, charset); +} + +void drop_loader(THD *thd [[maybe_unused]], + Bulk_loader_impl *loader [[maybe_unused]]) noexcept { + +} + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader.h b/components/bulk_load/bulk_loader.h new file mode 100644 index 000000000000..b565b70bb983 --- /dev/null +++ b/components/bulk_load/bulk_loader.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "bulk_loader/bulk_loader_abstract.h" + +#include "mysql/components/services/bulk_load_service.h" + +namespace Bulk_load { + +Bulk_loader_impl *create_loader(THD *thd, my_thread_id connection_id, + const TABLE *table, Bulk_source src, + const CHARSET_INFO *charset) noexcept; + +void drop_loader(THD *thd, Bulk_loader_impl *loader) noexcept; + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_abstract.cc b/components/bulk_load/bulk_loader/bulk_loader_abstract.cc new file mode 100644 index 000000000000..9ad4d3c048e9 --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_abstract.cc @@ -0,0 +1,326 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_loader_abstract.h" + +#include "components/bulk_load/stream_parser/stream_parser.h" + +#include +#include + +#include "scope_guard.h" + +#include +#include + +namespace Bulk_load { +namespace { + +const size_t parser_buffer_size = 1024; + +} // namespace + +Bulk_loader_impl::Bulk_loader_impl(THD *thd, my_thread_id connection_id, + const TABLE *table, + const CHARSET_INFO *charset) + : m_thd{thd} + , m_connection_id{connection_id} + , m_table{table} + , m_charset{charset} + , m_compression_algorithm{Bulk_compression_algorithm::NONE} {} + +THD *Bulk_loader_impl::get_thd() noexcept { + return m_thd; +} + +my_thread_id Bulk_loader_impl::get_connection_id() const noexcept { + return m_connection_id; +} + +const TABLE *Bulk_loader_impl::get_table() noexcept { + return m_table; +} + +const CHARSET_INFO *Bulk_loader_impl::get_charset() noexcept { + return m_charset; +} + +void Bulk_loader_impl::set_string(Bulk_string type, std::string value) noexcept { + m_string_attrs[type] = std::move(value); +} + +void Bulk_loader_impl::set_char(Bulk_char type, unsigned char value) noexcept { + m_char_attrs[type] = value; +} + +void Bulk_loader_impl::set_size(Bulk_size type, size_t value) noexcept { + m_size_attrs[type] = value; +} + +void Bulk_loader_impl::set_condition(Bulk_condition type, bool value) noexcept { + m_condition_attrs[type] = value; +} + +std::string Bulk_loader_impl::get_string(Bulk_string type) const noexcept { + const auto it = m_string_attrs.find(type); + return it != m_string_attrs.cend() ? it->second : ""; +} + +unsigned char Bulk_loader_impl::get_char(Bulk_char type) const noexcept { + const auto it = m_char_attrs.find(type); + return it != m_char_attrs.cend() ? it->second : '\0'; +} + +size_t Bulk_loader_impl::get_size(Bulk_size type) const noexcept { + const auto it = m_size_attrs.find(type); + return it != m_size_attrs.cend() ? it->second : 0; +} + +bool Bulk_loader_impl::get_condition(Bulk_condition type) const noexcept { + const auto it = m_condition_attrs.find(type); + return it != m_condition_attrs.cend() ? it->second : false; +} + +void Bulk_loader_impl::set_compression_algorithm( + Bulk_compression_algorithm algorithm) noexcept { + m_compression_algorithm = algorithm; +} + +Bulk_compression_algorithm Bulk_loader_impl::get_compression_algorithm() const noexcept { + return m_compression_algorithm; +} + +bool Bulk_loader_impl::acquire_services() noexcept { + m_srv_registry = mysql_plugin_registry_acquire(); + + if (m_srv_registry == nullptr) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot acquire registry"); + return false; + } + + if (m_srv_registry->acquire("bulk_data_convert", &m_svc_data_convert) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot acquire data service"); + return false; + } + + if (m_srv_registry->acquire("bulk_data_load", &m_svc_data_load) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot acquire load service"); + return false; + } + + return true; +} + +void Bulk_loader_impl::release_services() noexcept { + if (m_srv_registry != nullptr) { + if (m_svc_data_convert != nullptr && + m_srv_registry->release(m_svc_data_convert) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot release data service"); + } + + if (m_svc_data_load != nullptr && + m_srv_registry->release(m_svc_data_load) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot release load service"); + } + + if (mysql_plugin_registry_release(m_srv_registry) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot release registry"); + } + } +} + +SERVICE_TYPE(bulk_data_convert) *Bulk_loader_impl::data_convert_service() noexcept { + return reinterpret_cast(m_svc_data_convert); +} + +SERVICE_TYPE(bulk_data_load) *Bulk_loader_impl::data_load_service() noexcept { + return reinterpret_cast(m_svc_data_load); +} + +void *Bulk_loader_impl::start_session(size_t data_size, size_t se_memory_size, size_t num_threads) noexcept { + m_load_session_start_done = true; + auto *load_ctx = data_load_service()->begin(m_thd, m_table, data_size, se_memory_size, num_threads); + + if (load_ctx == nullptr) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Failed to start session"); + } + + return load_ctx; +} + +void Bulk_loader_impl::end_session(void *load_ctx) noexcept { + if (!m_load_session_start_done) { + return; + } + + if (!data_load_service()->end(m_thd, load_ctx, m_table, load_ctx == nullptr)) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Failed to end session"); + } + + m_load_session_start_done = false; +} + +ParserParamsPtr Bulk_loader_impl::get_parser_params() const noexcept { + return std::make_unique( + get_string(Bulk_string::COLUMN_TERM), + get_string(Bulk_string::ROW_TERM), + get_char(Bulk_char::ESCAPE_CHAR), + get_char(Bulk_char::ENCLOSE_CHAR), + get_size(Bulk_size::COUNT_ROW_SKIP), + parser_buffer_size + ); +} + +void Bulk_loader_impl::settings_dump_to_log() { + std::cout << "Bulk_loader_impl settings =====================================" << std::endl; + std::cout << "schema_name: " << get_string(Bulk_string::SCHEMA_NAME) << std::endl; + std::cout << "table_name: " << get_string(Bulk_string::TABLE_NAME) << std::endl; + std::cout << "file_prefix: " << get_string(Bulk_string::FILE_PREFIX) << std::endl; + std::cout << "file_suffix: " << get_string(Bulk_string::FILE_SUFFIX) << std::endl; + std::cout << "column_term: " << get_string(Bulk_string::COLUMN_TERM) << std::endl; + std::cout << "row_term: " << get_string(Bulk_string::ROW_TERM) << std::endl; + std::cout << "append_to_last_prefix: " << get_string(Bulk_string::APPENDTOLASTPREFIX) << std::endl; + std::cout << "escape_char: " << get_char(Bulk_char::ESCAPE_CHAR) << std::endl; + std::cout << "enclose_char: " << get_char(Bulk_char::ENCLOSE_CHAR) << std::endl; + std::cout << "count_files: " << get_size(Bulk_size::COUNT_FILES) << std::endl; + std::cout << "count_row_skip: " << get_size(Bulk_size::COUNT_ROW_SKIP) << std::endl; + std::cout << "count_columns: " << get_size(Bulk_size::COUNT_COLUMNS) << std::endl; + std::cout << "concurrency: " << get_size(Bulk_size::CONCURRENCY) << std::endl; + std::cout << "memory: " << get_size(Bulk_size::MEMORY) << std::endl; + std::cout << "start_index: " << get_size(Bulk_size::START_INDEX) << std::endl; + std::cout << "ordered_data: " << get_condition(Bulk_condition::ORDERED_DATA) << std::endl; + std::cout << "optional_enclose: " << get_condition(Bulk_condition::OPTIONAL_ENCLOSE) << std::endl; + std::cout << "dryrun: " << get_condition(Bulk_condition::DRYRUN) << std::endl; + std::cout << "compression_algorithm: " << (m_compression_algorithm == Bulk_compression_algorithm::NONE ? "NONE" : "ZSTD") << std::endl; + std::cout << "end settings ==================================================" << std::endl; +} + +bool Bulk_loader_impl::load(size_t &affected_rows [[maybe_unused]]) noexcept { + settings_dump_to_log(); + + void *load_ctx = nullptr; + + auto cleanup_guard = create_scope_guard([&]() { + end_session(load_ctx); + release_services(); + }); + + if (!acquire_services()) { + return false; + } + + // init stream reader + auto data_stream = get_data_stream(); + if (!data_stream->open()) { + return false; + } + + const auto *srv_data_convert = data_convert_service(); + const auto *srv_data_load = data_load_service(); + + const auto data_size = data_stream->get_data_size(); + const auto se_memory_size = srv_data_load->get_se_memory_size(m_thd, m_table); + + std::cout << "Bulk_loader_local::load file_size: " << data_size << std::endl; + std::cout << "Bulk_loader_local::load se_memory_size: " << se_memory_size << std::endl; + + load_ctx = start_session(data_size, se_memory_size, 1); + + if (load_ctx == nullptr) { + return false; + } + + Row_meta row_metadata; + + if (!srv_data_convert->get_row_metadata(m_thd, m_table, false, row_metadata)) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Failed to init row metadata"); + return false; + } + + auto parser_params = get_parser_params(); + StreamParser stream_parser{row_metadata.m_num_columns, + std::move(parser_params), + data_stream.get()}; + + // init buffers + Rows_text text_rows{row_metadata.m_num_columns}; + text_rows.set_num_rows(1); + size_t next_index = 0; + + Rows_mysql sql_rows{row_metadata.m_num_columns}; + char sql_rows_buffer[100] = ""; + size_t sql_rows_buffer_length = 100; + Bulk_load_error_location_details error_details; + + // process data + auto row_iterator = stream_parser.row_iterator(); + + for (auto &row : row_iterator) { + text_rows.process_columns(row.index, &row.getter); + } + + auto ret = srv_data_convert->mysql_format( + m_thd, m_table, text_rows, next_index, sql_rows_buffer, + sql_rows_buffer_length, m_charset, row_metadata, sql_rows, error_details); + + std::cout << "Bulk_loader_local::load srv_data_convert->mysql_format_from_raw ret: " << ret << std::endl; + std::cout << "Bulk_loader_local::load text_rows.get_num_rows: " << text_rows.get_num_rows() << std::endl; + std::cout << "Bulk_loader_local::load text_rows.get_num_cols: " << text_rows.get_num_cols() << std::endl; + std::cout << "Bulk_loader_local::load sql_rows.get_num_rows: " << sql_rows.get_num_rows() << std::endl; + std::cout << "Bulk_loader_local::load sql_rows.get_num_cols: " << sql_rows.get_num_cols() << std::endl; + + if (ret != 0) { + std::stringstream error; + error << "Failed to format data, filename: " << error_details.filename + << ", row_number: " << error_details.row_number + << ", column_name: " << error_details.column_name + << ", column_type: " << error_details.column_type + << ", column_input_data: " << error_details.column_input_data; + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + error.str().c_str()); + return false; + } + + Bulk_load::Stat_callbacks wait_cbks{ + []() {}, + []() {} + }; + + if (!srv_data_load->load(m_thd, load_ctx, m_table, sql_rows, 0, wait_cbks)) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Failed to load data"); + return false; + } + + return true; +} + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_abstract.h b/components/bulk_load/bulk_loader/bulk_loader_abstract.h new file mode 100644 index 000000000000..89ec59d7350e --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_abstract.h @@ -0,0 +1,108 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "mysql/service_plugin_registry.h" +#include "mysql/components/service.h" +#include "mysql/components/services/registry.h" +#include "mysql/components/services/bulk_data_service.h" +#include "mysql/components/services/bulk_load_service.h" + +#include "stream_parser/parser_params.h" +#include "components/bulk_load/data_stream/data_stream_abstract.h" + +#include +#include + +namespace Bulk_load { + +class Bulk_loader_impl { + public: + Bulk_loader_impl(THD *thd, my_thread_id connection_id, const TABLE *table, + const CHARSET_INFO *charset); + + Bulk_loader_impl(const Bulk_loader_impl &other) = delete; + Bulk_loader_impl(Bulk_loader_impl &&other) = delete; + virtual ~Bulk_loader_impl() = default; + Bulk_loader_impl &operator=(const Bulk_loader_impl &other) = delete; + Bulk_loader_impl &operator=(Bulk_loader_impl &&other) = delete; + + void set_string(Bulk_string type, std::string value) noexcept; + void set_char(Bulk_char type, unsigned char value) noexcept; + void set_size(Bulk_size type, size_t value) noexcept; + void set_condition(Bulk_condition type, bool value) noexcept; + void set_compression_algorithm(Bulk_compression_algorithm algorithm) noexcept; + + bool load(size_t &affected_rows) noexcept; + + protected: + // temporary debug + void settings_dump_to_log(); + + THD *get_thd() noexcept; + my_thread_id get_connection_id() const noexcept; + const TABLE *get_table() noexcept; + const CHARSET_INFO *get_charset() noexcept; + + std::string get_string(Bulk_string type) const noexcept; + unsigned char get_char(Bulk_char type) const noexcept; + size_t get_size(Bulk_size type) const noexcept; + bool get_condition(Bulk_condition type) const noexcept; + Bulk_compression_algorithm get_compression_algorithm() const noexcept; + + bool acquire_services() noexcept; + void release_services() noexcept; + SERVICE_TYPE(bulk_data_convert) *data_convert_service() noexcept; + SERVICE_TYPE(bulk_data_load) *data_load_service() noexcept; + + void *start_session(size_t data_size, size_t se_memory_size, size_t num_threads) noexcept; + void end_session(void *load_ctx) noexcept; + + ParserParamsPtr get_parser_params() const noexcept; + + private: + virtual std::unique_ptr get_data_stream() noexcept = 0; + + private: + THD *m_thd; + my_thread_id m_connection_id; + const TABLE *m_table; + const CHARSET_INFO *m_charset; + + std::unordered_map m_string_attrs; + std::unordered_map m_char_attrs; + std::unordered_map m_size_attrs; + std::unordered_map m_condition_attrs; + + Bulk_compression_algorithm m_compression_algorithm; + + SERVICE_TYPE(registry) *m_srv_registry = nullptr; + my_h_service m_svc_data_convert = nullptr; + my_h_service m_svc_data_load = nullptr; + bool m_load_session_start_done = false; +}; + +template +class Bulk_loader_base; + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_local.cc b/components/bulk_load/bulk_loader/bulk_loader_local.cc new file mode 100644 index 000000000000..e9f3326f59ce --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_local.cc @@ -0,0 +1,37 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_loader_local.h" + +namespace Bulk_load { + +Bulk_loader_base::Bulk_loader_base( + THD *thd, my_thread_id connection_id, const TABLE *table, + const CHARSET_INFO *charset) + : Bulk_loader_impl(thd, connection_id, table, charset) {} + +std::unique_ptr Bulk_loader_local::get_data_stream() noexcept { + return std::make_unique( + get_string(Bulk_string::FILE_PREFIX)); +} + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_local.h b/components/bulk_load/bulk_loader/bulk_loader_local.h new file mode 100644 index 000000000000..b30ff131ef74 --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_local.h @@ -0,0 +1,43 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "bulk_loader_abstract.h" + +#include "components/bulk_load/data_stream/stream_local/data_stream_local.h" + +namespace Bulk_load { + +template <> +class Bulk_loader_base : public Bulk_loader_impl { + public: + Bulk_loader_base(THD *thd, my_thread_id connection_id, + const TABLE *table, const CHARSET_INFO *charset); + + private: + std::unique_ptr get_data_stream() noexcept override; +}; + +using Bulk_loader_local = Bulk_loader_base; + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_s3.cc b/components/bulk_load/bulk_loader/bulk_loader_s3.cc new file mode 100644 index 000000000000..8d81c83167f7 --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_s3.cc @@ -0,0 +1,36 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_loader_s3.h" + +namespace Bulk_load { + +Bulk_loader_base::Bulk_loader_base( + THD *thd, my_thread_id connection_id, const TABLE *table, + const CHARSET_INFO *charset) + : Bulk_loader_impl(thd, connection_id, table, charset) {} + +std::unique_ptr Bulk_loader_s3::get_data_stream() noexcept { + return std::make_unique(get_string(Bulk_string::FILE_PREFIX)); +} + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_s3.h b/components/bulk_load/bulk_loader/bulk_loader_s3.h new file mode 100644 index 000000000000..4eadac05eb88 --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_s3.h @@ -0,0 +1,43 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "bulk_loader_abstract.h" + +#include "components/bulk_load/data_stream/stream_s3/data_stream_s3.h" + +namespace Bulk_load { + +template <> +class Bulk_loader_base : public Bulk_loader_impl { + public: + Bulk_loader_base(THD *thd, my_thread_id connection_id, + const TABLE *table, const CHARSET_INFO *charset); + + private: + std::unique_ptr get_data_stream() noexcept override; +}; + +using Bulk_loader_s3 = Bulk_loader_base; + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/data_stream_abstract.h b/components/bulk_load/data_stream/data_stream_abstract.h new file mode 100644 index 000000000000..32963f65ac64 --- /dev/null +++ b/components/bulk_load/data_stream/data_stream_abstract.h @@ -0,0 +1,42 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include + +namespace Bulk_load { + +class DataStreamAbstract { + public: + virtual ~DataStreamAbstract() = default; + + virtual bool open() noexcept = 0; + virtual size_t read(char *buffer, size_t size) noexcept = 0; + + size_t get_data_size() const noexcept { return m_data_size; } + + protected: + size_t m_data_size; +}; + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/stream_local/data_stream_local.cc b/components/bulk_load/data_stream/stream_local/data_stream_local.cc new file mode 100644 index 000000000000..ed6a8404f20d --- /dev/null +++ b/components/bulk_load/data_stream/stream_local/data_stream_local.cc @@ -0,0 +1,80 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "data_stream_local.h" + +#include +#include + +#include +#include +#include + +namespace Bulk_load { + +DataStreamLocal::DataStreamLocal(std::filesystem::path path) + : m_file_path{std::move(path)} {} + +DataStreamLocal::~DataStreamLocal() { + m_stream.close(); +} + +bool DataStreamLocal::open() noexcept { + if (m_file_path.empty()) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Empty file name"); + return false; + } + + if (!std::filesystem::exists(m_file_path)) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot find file"); + return false; + } + + m_data_size = std::filesystem::file_size(m_file_path); + + if (m_data_size == 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Empty file"); + return false; + } + + m_stream.open(m_file_path, std::ios_base::in | std::ios_base::binary); + + if (!m_stream.is_open()) { + std::stringstream error; + error << "Failed to open file: " << strerror(errno); + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + error.str().c_str()); + return false; + } + + return true; +} + +size_t DataStreamLocal::read(char *buffer, size_t size) noexcept { + m_stream.read(buffer, size); + return m_stream.gcount(); +} + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/stream_local/data_stream_local.h b/components/bulk_load/data_stream/stream_local/data_stream_local.h new file mode 100644 index 000000000000..cf22e4065329 --- /dev/null +++ b/components/bulk_load/data_stream/stream_local/data_stream_local.h @@ -0,0 +1,45 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "components/bulk_load/data_stream/data_stream_abstract.h" + +#include +#include + +namespace Bulk_load { + +class DataStreamLocal : public DataStreamAbstract { + public: + explicit DataStreamLocal(std::filesystem::path path); + ~DataStreamLocal() override; + + bool open() noexcept override; + size_t read(char *buffer, size_t size) noexcept override; + + private: + std::filesystem::path m_file_path; + std::ifstream m_stream; +}; + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/stream_s3/data_stream_s3.cc b/components/bulk_load/data_stream/stream_s3/data_stream_s3.cc new file mode 100644 index 000000000000..44c56c6fd92c --- /dev/null +++ b/components/bulk_load/data_stream/stream_s3/data_stream_s3.cc @@ -0,0 +1,37 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "data_stream_s3.h" + +namespace Bulk_load { + +DataStreamS3::DataStreamS3(std::filesystem::path path [[maybe_unused]]) {} + +bool DataStreamS3::open() noexcept { + return true; +} + +size_t DataStreamS3::read(char *buffer [[maybe_unused]], size_t size [[maybe_unused]]) noexcept { + return 0; +} + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/stream_s3/data_stream_s3.h b/components/bulk_load/data_stream/stream_s3/data_stream_s3.h new file mode 100644 index 000000000000..f4b3e3b76880 --- /dev/null +++ b/components/bulk_load/data_stream/stream_s3/data_stream_s3.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "components/bulk_load/data_stream/data_stream_abstract.h" + +#include + +namespace Bulk_load { + +class DataStreamS3 : public DataStreamAbstract { + public: + explicit DataStreamS3(std::filesystem::path path); + + bool open() noexcept override; + size_t read(char *buffer, size_t size) noexcept override; +}; + +} // namespace Bulk_load diff --git a/components/bulk_load/stream_parser/parser_iterator.h b/components/bulk_load/stream_parser/parser_iterator.h new file mode 100644 index 000000000000..3ccb91f4f835 --- /dev/null +++ b/components/bulk_load/stream_parser/parser_iterator.h @@ -0,0 +1,65 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "parser_row.h" + +#include +#include + +namespace Bulk_load { + +struct ParserIterator { + +struct promise_type { + ParserRow *m_row; + + void unhandled_exception() noexcept {} + ParserIterator get_return_object() { return ParserIterator{*this}; } + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_always yield_value(ParserRow row) noexcept { + m_row = std::move(row); + return {}; + } + void return_value(ParserRow row) { m_row = std::move(row); } + std::suspend_always final_suspend() noexcept { return {}; } +}; + +std::coroutine_handle m_handle{}; + +bool operator==(std::default_sentinel_t) const { + return m_handle.done(); +} + +ParserIterator &operator++() { + m_handle.resume(); + return *this; +} + +const ParserRow operator*() const { + return m_handle.promise().m_row; +} + +}; + +} // namespace Bulk_load diff --git a/components/bulk_load/stream_parser/parser_params.h b/components/bulk_load/stream_parser/parser_params.h new file mode 100644 index 000000000000..ffe7b04811eb --- /dev/null +++ b/components/bulk_load/stream_parser/parser_params.h @@ -0,0 +1,52 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include +#include +#include +#include + +namespace Bulk_load { + +struct ParserParams { + ParserParams(std::string column_terminator, std::string row_terminator, + unsigned char escape_char, unsigned char column_enclose_char, + size_t count_row_skip, size_t buffer_size) + : m_column_terminator{std::move(column_terminator)}, + m_row_terminator{std::move(row_terminator)}, + m_escape_char{escape_char}, + m_column_enclose_char{column_enclose_char}, + m_count_row_skip{count_row_skip}, + m_buffer_size{buffer_size} {} + std::string m_column_terminator; + std::string m_row_terminator; + unsigned char m_escape_char; + unsigned char m_column_enclose_char; + size_t m_count_row_skip; + size_t m_buffer_size; +}; + +using ParserParamsPtr = std::unique_ptr; + +} // namespace Bulk_load diff --git a/components/bulk_load/stream_parser/parser_row.h b/components/bulk_load/stream_parser/parser_row.h new file mode 100644 index 000000000000..ed7eb128f792 --- /dev/null +++ b/components/bulk_load/stream_parser/parser_row.h @@ -0,0 +1,38 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include +#include + +namespace Bulk_load { + +struct ParserRow { + bool m_is_error; + size_t m_num_columns; + size_t m_row_idx; + std::vector m_columns; + std::vector m_columns_length; +}; + +} // namespace Bulk_load diff --git a/components/bulk_load/stream_parser/stream_parser.cc b/components/bulk_load/stream_parser/stream_parser.cc new file mode 100644 index 000000000000..8cd6192ef94c --- /dev/null +++ b/components/bulk_load/stream_parser/stream_parser.cc @@ -0,0 +1,109 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "stream_parser.h" + +#include +#include + +#include +#include +#include + +namespace Bulk_load { + +StreamParser::StreamParser(size_t num_columns, + ParserParamsPtr parser_params, + DataStreamAbstract *data_stream) + : m_num_columns{num_columns}, + m_parser_params{std::move(parser_params)}, + m_data_stream{data_stream}, + m_unparsed_stream_size{m_data_stream->get_data_size()}, + m_current_buffer_idx{0}, + m_row{} { + m_buffer.reset(static_cast(malloc(m_parser_params->m_buffer_size))); + m_columns.reserve(m_num_columns); + m_columns_length.reserve(m_num_columns); +} + +ParserIterator StreamParser::row_iterator() noexcept { + while (m_unparsed_stream_size > 0) { + const auto len_to_read = std::min(m_unparsed_stream_size, m_parser_params->m_buffer_size); + m_unparsed_buffer_size = m_data_stream->read(m_buffer.get(), len_to_read); + m_current_buffer_idx = 0; + + if (m_unparsed_buffer_size == 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Failed to read data"); + co_return {true}; + } + + int column_data_begin = -1; + int column_data_end = -1; + size_t column_idx = 0; + bool row_ready = false; + + while (m_unparsed_buffer_size > 0) { + if (m_buffer.get()[m_current_buffer_idx] == m_parser_params->m_column_enclose_char) { + if (column_data_begin == -1) { + column_data_begin = column_data_end = m_current_buffer_idx; + } + else { + column_data_end = m_current_buffer_idx; + + m_columns[column_idx] = column_data_begin; + m_columns_length[column_idx] = column_data_end - column_data_begin; + + column_data_begin = column_data_end = -1; + ++column_idx; + } + } + else if (m_buffer.get()[m_current_buffer_idx] == m_parser_params->m_column_terminator[0]) { + + } + else if (m_buffer.get()[m_current_buffer_idx] == m_parser_params->m_row_terminator[0]) { + row_ready = true; + } + + ++m_current_buffer_idx; + --m_unparsed_buffer_size; + + if (row_ready) { + co_yield {m_columns, m_columns_length}; + +// for (size_t i = 0; i < column_idx; ++i) { +// auto tmp = std::string(m_buffer.get()[m_columns[i]], m_columns_length[i]); +// std::cout << "!!! row: " << tmp << std::endl; +// } + + column_idx = 0; + } + } + + assert(m_unparsed_buffer_size <= m_unparsed_stream_size); + m_unparsed_stream_size -= m_unparsed_buffer_size; + } + + co_return {false}; +} + +} // namespace Bulk_load diff --git a/components/bulk_load/stream_parser/stream_parser.h b/components/bulk_load/stream_parser/stream_parser.h new file mode 100644 index 000000000000..578e9b164a46 --- /dev/null +++ b/components/bulk_load/stream_parser/stream_parser.h @@ -0,0 +1,58 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "parser_params.h" +#include "parser_iterator.h" +#include "parser_row.h" +#include "components/bulk_load/data_stream/data_stream_abstract.h" + +#include +#include +#include + +namespace Bulk_load { + +class StreamParser { + public: + StreamParser(size_t num_columns, + ParserParamsPtr parser_params, + DataStreamAbstract *data_stream); + + ParserIterator row_iterator() noexcept; + + private: + const size_t m_num_columns; + ParserParamsPtr m_parser_params; + DataStreamAbstract *m_data_stream; + std::unique_ptr m_buffer; + size_t m_unparsed_stream_size; + size_t m_unparsed_buffer_size; + size_t m_current_buffer_idx; + + std::vector m_columns; + std::vector m_columns_length; + ParserRow m_row; +}; + +} // namespace Bulk_load diff --git a/mysql-test/include/plugin.defs b/mysql-test/include/plugin.defs index ed168b4a773f..4db1cc20062e 100644 --- a/mysql-test/include/plugin.defs +++ b/mysql-test/include/plugin.defs @@ -210,3 +210,4 @@ component_binlog_utils_udf plugin_output_directory no BINLOG_UTILS_UD component_encryption_udf plugin_output_directory no ENCRYPTION_UDF_COMPONENT component_masking_functions plugin_output_directory no MASKING_FUNCTIONS_COMPONENT component_percona_udf plugin_output_directory no PERCONA_UDF_COMPONENT +component_bulk_load plugin_output_directory no BULK_LOAD_COMPONENT diff --git a/mysql-test/suite/component_bulk_load/t/bulk_load-master.opt b/mysql-test/suite/component_bulk_load/t/bulk_load-master.opt new file mode 100644 index 000000000000..c6d7afc0eff5 --- /dev/null +++ b/mysql-test/suite/component_bulk_load/t/bulk_load-master.opt @@ -0,0 +1,2 @@ +--local_infile +--secure-file-priv="" diff --git a/mysql-test/suite/component_bulk_load/t/bulk_load.test b/mysql-test/suite/component_bulk_load/t/bulk_load.test new file mode 100644 index 000000000000..34bcace5db7f --- /dev/null +++ b/mysql-test/suite/component_bulk_load/t/bulk_load.test @@ -0,0 +1,27 @@ +INSTALL COMPONENT 'file://component_bulk_load'; + +--let bulk_load_data_csv=$MYSQL_TMP_DIR/test_data.csv + +write_file $bulk_load_data_csv; +"1","2","3" +"21","22","23" +"31","32","33" +EOF + +--echo ==================================== +--cat_file $bulk_load_data_csv +--echo ==================================== + +CREATE TABLE t ( + a int primary key, + b int, + c int +) engine=innodb; + +--eval LOAD DATA FROM INFILE "$bulk_load_data_csv" INTO TABLE t FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' ALGORITHM = BULK + +select * from t; +drop table t; + +UNINSTALL COMPONENT 'file://component_bulk_load'; +--remove_file $bulk_load_data_csv diff --git a/mysql-test/suite/component_bulk_load/t/suite.opt b/mysql-test/suite/component_bulk_load/t/suite.opt new file mode 100644 index 000000000000..1696ae6ec25f --- /dev/null +++ b/mysql-test/suite/component_bulk_load/t/suite.opt @@ -0,0 +1 @@ +$BULK_LOAD_COMPONENT_OPT