From d2d19ede1ff6ad46e28c06a3ad7cf6165debfe53 Mon Sep 17 00:00:00 2001 From: Oleksandr Kachan Date: Fri, 20 Sep 2024 16:26:56 +0300 Subject: [PATCH] PS-9187: Bulk ingest feature --- components/bulk_load/CMakeLists.txt | 41 ++++ components/bulk_load/bulk_load_component.cc | 126 ++++++++++ components/bulk_load/bulk_load_component.h | 60 +++++ components/bulk_load/bulk_loader.cc | 58 +++++ components/bulk_load/bulk_loader.h | 37 +++ .../bulk_loader/bulk_loader_abstract.cc | 215 ++++++++++++++++++ .../bulk_loader/bulk_loader_abstract.h | 104 +++++++++ .../bulk_loader/bulk_loader_local.cc | 144 ++++++++++++ .../bulk_load/bulk_loader/bulk_loader_local.h | 40 ++++ .../bulk_load/bulk_loader/bulk_loader_s3.cc | 36 +++ .../bulk_load/bulk_loader/bulk_loader_s3.h | 40 ++++ .../data_stream/data_stream_abstract.h | 37 +++ .../stream_local/data_stream_local.cc | 80 +++++++ .../stream_local/data_stream_local.h | 45 ++++ .../data_stream/stream_s3/data_stream_s3.cc | 29 +++ .../data_stream/stream_s3/data_stream_s3.h | 33 +++ .../bulk_load/stream_parser/parser_params.h | 49 ++++ mysql-test/include/plugin.defs | 1 + .../t/bulk_load-master.opt | 2 + .../component_bulk_load/t/bulk_load.test | 28 +++ .../suite/component_bulk_load/t/suite.opt | 1 + 21 files changed, 1206 insertions(+) create mode 100644 components/bulk_load/CMakeLists.txt create mode 100644 components/bulk_load/bulk_load_component.cc create mode 100644 components/bulk_load/bulk_load_component.h create mode 100644 components/bulk_load/bulk_loader.cc create mode 100644 components/bulk_load/bulk_loader.h create mode 100644 components/bulk_load/bulk_loader/bulk_loader_abstract.cc create mode 100644 components/bulk_load/bulk_loader/bulk_loader_abstract.h create mode 100644 components/bulk_load/bulk_loader/bulk_loader_local.cc create mode 100644 components/bulk_load/bulk_loader/bulk_loader_local.h create mode 100644 components/bulk_load/bulk_loader/bulk_loader_s3.cc create mode 100644 components/bulk_load/bulk_loader/bulk_loader_s3.h create mode 100644 components/bulk_load/data_stream/data_stream_abstract.h create mode 100644 components/bulk_load/data_stream/stream_local/data_stream_local.cc create mode 100644 components/bulk_load/data_stream/stream_local/data_stream_local.h create mode 100644 components/bulk_load/data_stream/stream_s3/data_stream_s3.cc create mode 100644 components/bulk_load/data_stream/stream_s3/data_stream_s3.h create mode 100644 components/bulk_load/stream_parser/parser_params.h create mode 100644 mysql-test/suite/component_bulk_load/t/bulk_load-master.opt create mode 100644 mysql-test/suite/component_bulk_load/t/bulk_load.test create mode 100644 mysql-test/suite/component_bulk_load/t/suite.opt diff --git a/components/bulk_load/CMakeLists.txt b/components/bulk_load/CMakeLists.txt new file mode 100644 index 000000000000..a425826ed102 --- /dev/null +++ b/components/bulk_load/CMakeLists.txt @@ -0,0 +1,41 @@ +# Copyright (c) 2024, Percona and/or its affiliates. + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License, version 2.0, +# as published by the Free Software Foundation. + +# This program is also distributed with certain software (including +# but not limited to OpenSSL) that is licensed under separate terms, +# as designated in a particular file or component or in included license +# documentation. The authors of MySQL hereby grant you an additional +# permission to link the program and your derivative works with the +# separately licensed software that they have included with MySQL. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License, version 2.0, for more details. + +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +ADD_DEFINITIONS(-DLOG_COMPONENT_TAG="bulk_load") + +MYSQL_ADD_COMPONENT(bulk_load + bulk_load_component.cc + bulk_loader.cc + bulk_loader/bulk_loader_abstract.cc + bulk_loader/bulk_loader_local.cc + bulk_loader/bulk_loader_s3.cc + data_stream/stream_local/data_stream_local.cc + data_stream/stream_s3/data_stream_s3.cc + LINK_LIBRARIES extra::rapidjson + MODULE_ONLY +) + +target_include_directories( + component_bulk_load + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} +) diff --git a/components/bulk_load/bulk_load_component.cc b/components/bulk_load/bulk_load_component.cc new file mode 100644 index 000000000000..ab2c5fc38893 --- /dev/null +++ b/components/bulk_load/bulk_load_component.cc @@ -0,0 +1,126 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_load_component.h" + +#include + +#include "mysql/components/service_implementation.h" +#include "mysql/components/services/log_builtins.h" + +#include "bulk_loader.h" +#include "bulk_loader/bulk_loader_abstract.h" + +REQUIRES_SERVICE_PLACEHOLDER(log_builtins); +REQUIRES_SERVICE_PLACEHOLDER(log_builtins_string); + +SERVICE_TYPE(log_builtins) *log_bi; +SERVICE_TYPE(log_builtins_string) *log_bs; + +namespace Bulk_load { + +DEFINE_METHOD(Bulk_loader *, create_bulk_loader, + (THD *thd, my_thread_id connection_id, const TABLE *table, + Bulk_source src, const CHARSET_INFO *charset)) { + return Bulk_load::create_loader(thd, connection_id, table, src, charset); +} + +DEFINE_METHOD(void, set_string, + (Bulk_loader *loader, Bulk_string type, std::string value)) { + static_cast(loader)->set_string(type, std::move(value)); +} + +DEFINE_METHOD(void, set_char, + (Bulk_loader *loader, Bulk_char type, unsigned char value)) { + static_cast(loader)->set_char(type, value); +} + +DEFINE_METHOD(void, set_size, + (Bulk_loader *loader, Bulk_size type, size_t value)) { + static_cast(loader)->set_size(type, value); +} + +DEFINE_METHOD(void, set_condition, + (Bulk_loader *loader, Bulk_condition type, bool value)) { + static_cast(loader)->set_condition(type, value); +} + +DEFINE_METHOD(void, set_compression_algorithm, + (Bulk_loader *loader, Bulk_compression_algorithm algorithm)) { + static_cast(loader)->set_compression_algorithm(algorithm); +} + +DEFINE_METHOD(bool, load, (Bulk_loader *loader, size_t &affected_rows)) { + return static_cast(loader)->load(affected_rows); +} + +DEFINE_METHOD(void, drop_bulk_loader, (THD *thd, Bulk_loader *loader)) { + Bulk_load::drop_loader(thd, static_cast(loader)); +} + +} // namespace Bulk_load + +static mysql_service_status_t component_init() { + log_bi = mysql_service_log_builtins; + log_bs = mysql_service_log_builtins_string; + + return 0; +} + +static mysql_service_status_t component_deinit() { + return 0; +} + +// clang-format off +BEGIN_SERVICE_IMPLEMENTATION(component_bulk_load, bulk_load_driver) +Bulk_load::create_bulk_loader, Bulk_load::set_string, + Bulk_load::set_char, Bulk_load::set_size, + Bulk_load::set_condition, Bulk_load::set_compression_algorithm, + Bulk_load::load, Bulk_load::drop_bulk_loader, + END_SERVICE_IMPLEMENTATION(); + +BEGIN_COMPONENT_PROVIDES(component_bulk_load) +PROVIDES_SERVICE(component_bulk_load, bulk_load_driver), +// PROVIDES_SERVICE(component_bulk_load, log_builtins), +// PROVIDES_SERVICE(component_bulk_load, log_builtins_string), + END_COMPONENT_PROVIDES(); + +BEGIN_COMPONENT_REQUIRES(component_bulk_load) +REQUIRES_SERVICE(registry), REQUIRES_SERVICE(log_builtins), + REQUIRES_SERVICE(log_builtins_string), + // REQUIRES_SERVICE(bulk_data_load), + //REQUIRES_SERVICE(bulk_data_convert), + END_COMPONENT_REQUIRES(); + +BEGIN_COMPONENT_METADATA(component_bulk_load) + METADATA("mysql.author", "Percona Corporation"), + METADATA("mysql.license", "GPL"), +END_COMPONENT_METADATA(); + +DECLARE_COMPONENT(component_bulk_load, "component_bulk_load") + component_init, component_deinit, + END_DECLARE_COMPONENT(); + +DECLARE_LIBRARY_COMPONENTS &COMPONENT_REF(component_bulk_load) + END_DECLARE_LIBRARY_COMPONENTS + +// clang-format on diff --git a/components/bulk_load/bulk_load_component.h b/components/bulk_load/bulk_load_component.h new file mode 100644 index 000000000000..44bbad9da74c --- /dev/null +++ b/components/bulk_load/bulk_load_component.h @@ -0,0 +1,60 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include +#include +#include + +#include "bulk_loader/bulk_loader_abstract.h" + +extern REQUIRES_SERVICE_PLACEHOLDER(registry); +//extern REQUIRES_SERVICE_PLACEHOLDER(bulk_data_load); +//extern REQUIRES_SERVICE_PLACEHOLDER(bulk_data_convert); + +namespace Bulk_load { + +DEFINE_METHOD(Bulk_loader *, create_bulk_loader, + (THD *thd, my_thread_id connection_id, const TABLE *table, + Bulk_source src, const CHARSET_INFO *charset)); + +DEFINE_METHOD(void, set_string, + (Bulk_loader_impl *loader, Bulk_string type, std::string value)); + +DEFINE_METHOD(void, set_char, + (Bulk_loader_impl *loader, Bulk_char type, unsigned char value)); + +DEFINE_METHOD(void, set_size, + (Bulk_loader_impl *loader, Bulk_size type, size_t value)); + +DEFINE_METHOD(void, set_condition, + (Bulk_loader_impl *loader, Bulk_condition type, bool value)); + +DEFINE_METHOD(void, set_compression_algorithm, + (Bulk_loader_impl *loader, Bulk_compression_algorithm algorithm)); + +DEFINE_METHOD(bool, load, (Bulk_loader_impl *loader, size_t &affected_rows)); + +DEFINE_METHOD(void, drop_bulk_loader, (THD *thd, Bulk_loader_impl *loader)); + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader.cc b/components/bulk_load/bulk_loader.cc new file mode 100644 index 000000000000..b82fa5a7ea6b --- /dev/null +++ b/components/bulk_load/bulk_loader.cc @@ -0,0 +1,58 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_loader.h" + +#include "bulk_loader/bulk_loader_local.h" +#include "bulk_loader/bulk_loader_s3.h" + +namespace Bulk_load { + +template +Bulk_loader_impl *create_helper(THD *thd, my_thread_id connection_id, + const TABLE *table, + const CHARSET_INFO *charset) { + return new Bulk_loader_base(thd, connection_id, table, charset); +} + +Bulk_loader_impl *create_loader(THD *thd, my_thread_id connection_id, + const TABLE *table, Bulk_source src, + const CHARSET_INFO *charset) noexcept { + if (src == Bulk_source::OCI) { + return nullptr; // not supported + } + + using CreateFunc = Bulk_loader_impl *(*)(THD *, my_thread_id, const TABLE *, + const CHARSET_INFO *); + static const CreateFunc funcs[static_cast(Bulk_source::S3) + 1] = { + create_helper, + nullptr, + create_helper}; + return (*funcs[static_cast(src)])(thd, connection_id, table, charset); +} + +void drop_loader(THD *thd [[maybe_unused]], + Bulk_loader_impl *loader [[maybe_unused]]) noexcept { + +} + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader.h b/components/bulk_load/bulk_loader.h new file mode 100644 index 000000000000..b565b70bb983 --- /dev/null +++ b/components/bulk_load/bulk_loader.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "bulk_loader/bulk_loader_abstract.h" + +#include "mysql/components/services/bulk_load_service.h" + +namespace Bulk_load { + +Bulk_loader_impl *create_loader(THD *thd, my_thread_id connection_id, + const TABLE *table, Bulk_source src, + const CHARSET_INFO *charset) noexcept; + +void drop_loader(THD *thd, Bulk_loader_impl *loader) noexcept; + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_abstract.cc b/components/bulk_load/bulk_loader/bulk_loader_abstract.cc new file mode 100644 index 000000000000..46e2530698f6 --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_abstract.cc @@ -0,0 +1,215 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_loader_abstract.h" + +#include +#include + +#include +#include + +namespace Bulk_load { + +Bulk_loader_impl::Bulk_loader_impl(THD *thd, my_thread_id connection_id, + const TABLE *table, + const CHARSET_INFO *charset) + : m_thd{thd} + , m_connection_id{connection_id} + , m_table{table} + , m_charset{charset} + , m_compression_algorithm{Bulk_compression_algorithm::NONE} {} + +THD *Bulk_loader_impl::get_thd() noexcept { + return m_thd; +} + +my_thread_id Bulk_loader_impl::get_connection_id() const noexcept { + return m_connection_id; +} + +const TABLE *Bulk_loader_impl::get_table() noexcept { + return m_table; +} + +const CHARSET_INFO *Bulk_loader_impl::get_charset() noexcept { + return m_charset; +} + +void Bulk_loader_impl::set_string(Bulk_string type, std::string value) noexcept { + m_string_attrs[type] = std::move(value); +} + +void Bulk_loader_impl::set_char(Bulk_char type, unsigned char value) noexcept { + m_char_attrs[type] = value; +} + +void Bulk_loader_impl::set_size(Bulk_size type, size_t value) noexcept { + m_size_attrs[type] = value; +} + +void Bulk_loader_impl::set_condition(Bulk_condition type, bool value) noexcept { + m_condition_attrs[type] = value; +} + +std::string Bulk_loader_impl::get_string(Bulk_string type) const noexcept { + const auto it = m_string_attrs.find(type); + return it != m_string_attrs.cend() ? it->second : ""; +} + +unsigned char Bulk_loader_impl::get_char(Bulk_char type) const noexcept { + const auto it = m_char_attrs.find(type); + return it != m_char_attrs.cend() ? it->second : '\0'; +} + +size_t Bulk_loader_impl::get_size(Bulk_size type) const noexcept { + const auto it = m_size_attrs.find(type); + return it != m_size_attrs.cend() ? it->second : 0; +} + +bool Bulk_loader_impl::get_condition(Bulk_condition type) const noexcept { + const auto it = m_condition_attrs.find(type); + return it != m_condition_attrs.cend() ? it->second : false; +} + +void Bulk_loader_impl::set_compression_algorithm( + Bulk_compression_algorithm algorithm) noexcept { + m_compression_algorithm = algorithm; +} + +Bulk_compression_algorithm Bulk_loader_impl::get_compression_algorithm() const noexcept { + return m_compression_algorithm; +} + +bool Bulk_loader_impl::acquire_services() noexcept { + m_srv_registry = mysql_plugin_registry_acquire(); + + if (m_srv_registry == nullptr) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot acquire registry"); + return false; + } + + if (m_srv_registry->acquire("bulk_data_convert", &m_svc_data_convert) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot acquire data service"); + return false; + } + + if (m_srv_registry->acquire("bulk_data_load", &m_svc_data_load) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot acquire load service"); + return false; + } + + return true; +} + +void Bulk_loader_impl::release_services() noexcept { + if (m_srv_registry != nullptr) { + if (m_svc_data_convert != nullptr && + m_srv_registry->release(m_svc_data_convert) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot release data service"); + } + + if (m_svc_data_load != nullptr && + m_srv_registry->release(m_svc_data_load) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot release load service"); + } + + if (mysql_plugin_registry_release(m_srv_registry) != 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot release registry"); + } + } +} + +SERVICE_TYPE(bulk_data_convert) *Bulk_loader_impl::data_convert_service() noexcept { + return reinterpret_cast(m_svc_data_convert); +} + +SERVICE_TYPE(bulk_data_load) *Bulk_loader_impl::data_load_service() noexcept { + return reinterpret_cast(m_svc_data_load); +} + +void *Bulk_loader_impl::start_session(size_t data_size, size_t se_memory_size, size_t num_threads) noexcept { + m_load_session_start_done = true; + auto *load_ctx = data_load_service()->begin(m_thd, m_table, data_size, se_memory_size, num_threads); + + if (load_ctx == nullptr) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Failed to start session"); + } + + return load_ctx; +} + +void Bulk_loader_impl::end_session(void *load_ctx) noexcept { + if (!m_load_session_start_done) { + return; + } + + if (!data_load_service()->end(m_thd, load_ctx, m_table, load_ctx == nullptr)) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Failed to end session"); + } + + m_load_session_start_done = false; +} + +ParserParamsPtr Bulk_loader_impl::get_parser_params() const noexcept { + return std::make_unique( + get_string(Bulk_string::COLUMN_TERM), + get_string(Bulk_string::ROW_TERM), + get_char(Bulk_char::ESCAPE_CHAR), + get_char(Bulk_char::ENCLOSE_CHAR), + get_size(Bulk_size::COUNT_ROW_SKIP) + ); +} + +void Bulk_loader_impl::settings_dump_to_log() { + std::cout << "Bulk_loader_impl settings =====================================" << std::endl; + std::cout << "schema_name: " << get_string(Bulk_string::SCHEMA_NAME) << std::endl; + std::cout << "table_name: " << get_string(Bulk_string::TABLE_NAME) << std::endl; + std::cout << "file_prefix: " << get_string(Bulk_string::FILE_PREFIX) << std::endl; + std::cout << "file_suffix: " << get_string(Bulk_string::FILE_SUFFIX) << std::endl; + std::cout << "column_term: " << get_string(Bulk_string::COLUMN_TERM) << std::endl; + std::cout << "row_term: " << get_string(Bulk_string::ROW_TERM) << std::endl; + std::cout << "append_to_last_prefix: " << get_string(Bulk_string::APPENDTOLASTPREFIX) << std::endl; + std::cout << "escape_char: " << get_char(Bulk_char::ESCAPE_CHAR) << std::endl; + std::cout << "enclose_char: " << get_char(Bulk_char::ENCLOSE_CHAR) << std::endl; + std::cout << "count_files: " << get_size(Bulk_size::COUNT_FILES) << std::endl; + std::cout << "count_row_skip: " << get_size(Bulk_size::COUNT_ROW_SKIP) << std::endl; + std::cout << "count_columns: " << get_size(Bulk_size::COUNT_COLUMNS) << std::endl; + std::cout << "concurrency: " << get_size(Bulk_size::CONCURRENCY) << std::endl; + std::cout << "memory: " << get_size(Bulk_size::MEMORY) << std::endl; + std::cout << "start_index: " << get_size(Bulk_size::START_INDEX) << std::endl; + std::cout << "ordered_data: " << get_condition(Bulk_condition::ORDERED_DATA) << std::endl; + std::cout << "optional_enclose: " << get_condition(Bulk_condition::OPTIONAL_ENCLOSE) << std::endl; + std::cout << "dryrun: " << get_condition(Bulk_condition::DRYRUN) << std::endl; + std::cout << "compression_algorithm: " << (m_compression_algorithm == Bulk_compression_algorithm::NONE ? "NONE" : "ZSTD") << std::endl; + std::cout << "end settings ==================================================" << std::endl; +} + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_abstract.h b/components/bulk_load/bulk_loader/bulk_loader_abstract.h new file mode 100644 index 000000000000..37edd8c62589 --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_abstract.h @@ -0,0 +1,104 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "mysql/service_plugin_registry.h" +#include "mysql/components/service.h" +#include "mysql/components/services/registry.h" +#include "mysql/components/services/bulk_data_service.h" +#include "mysql/components/services/bulk_load_service.h" + +#include "stream_parser/parser_params.h" + +#include +#include + +namespace Bulk_load { + +class Bulk_loader_impl { + public: + Bulk_loader_impl(THD *thd, my_thread_id connection_id, const TABLE *table, + const CHARSET_INFO *charset); + + Bulk_loader_impl(const Bulk_loader_impl &other) = delete; + Bulk_loader_impl(Bulk_loader_impl &&other) = delete; + virtual ~Bulk_loader_impl() = default; + Bulk_loader_impl &operator=(const Bulk_loader_impl &other) = delete; + Bulk_loader_impl &operator=(Bulk_loader_impl &&other) = delete; + + void set_string(Bulk_string type, std::string value) noexcept; + void set_char(Bulk_char type, unsigned char value) noexcept; + void set_size(Bulk_size type, size_t value) noexcept; + void set_condition(Bulk_condition type, bool value) noexcept; + void set_compression_algorithm(Bulk_compression_algorithm algorithm) noexcept; + + virtual bool load(size_t &affected_rows) noexcept = 0; + + protected: + // temporary debug + void settings_dump_to_log(); + + THD *get_thd() noexcept; + my_thread_id get_connection_id() const noexcept; + const TABLE *get_table() noexcept; + const CHARSET_INFO *get_charset() noexcept; + + std::string get_string(Bulk_string type) const noexcept; + unsigned char get_char(Bulk_char type) const noexcept; + size_t get_size(Bulk_size type) const noexcept; + bool get_condition(Bulk_condition type) const noexcept; + Bulk_compression_algorithm get_compression_algorithm() const noexcept; + + bool acquire_services() noexcept; + void release_services() noexcept; + SERVICE_TYPE(bulk_data_convert) *data_convert_service() noexcept; + SERVICE_TYPE(bulk_data_load) *data_load_service() noexcept; + + void *start_session(size_t data_size, size_t se_memory_size, size_t num_threads) noexcept; + void end_session(void *load_ctx) noexcept; + + ParserParamsPtr get_parser_params() const noexcept; + + private: + THD *m_thd; + my_thread_id m_connection_id; + const TABLE *m_table; + const CHARSET_INFO *m_charset; + + std::unordered_map m_string_attrs; + std::unordered_map m_char_attrs; + std::unordered_map m_size_attrs; + std::unordered_map m_condition_attrs; + + Bulk_compression_algorithm m_compression_algorithm; + + SERVICE_TYPE(registry) *m_srv_registry = nullptr; + my_h_service m_svc_data_convert = nullptr; + my_h_service m_svc_data_load = nullptr; + bool m_load_session_start_done = false; +}; + +template +class Bulk_loader_base; + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_local.cc b/components/bulk_load/bulk_loader/bulk_loader_local.cc new file mode 100644 index 000000000000..22690c0e5b5c --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_local.cc @@ -0,0 +1,144 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_loader_local.h" + +#include "components/bulk_load/data_stream/stream_local/data_stream_local.h" + +#include "scope_guard.h" + +#include +#include + +namespace Bulk_load { + +Bulk_loader_base::Bulk_loader_base( + THD *thd, my_thread_id connection_id, const TABLE *table, + const CHARSET_INFO *charset) + : Bulk_loader_impl(thd, connection_id, table, charset) {} + +bool Bulk_loader_local::load(size_t &affected_rows [[maybe_unused]]) noexcept { + settings_dump_to_log(); + + void *load_ctx = nullptr; + + auto cleanup_guard = create_scope_guard([&]() { + end_session(load_ctx); + release_services(); + }); + + if (!acquire_services()) { + return false; + } + + // init stream reader + auto data_stream = DataStreamLocal{get_string(Bulk_string::FILE_PREFIX)}; + if (!data_stream.open()) { + return false; + } + + // init stream parser + auto parser_params = get_parser_params(); + StreamParser stream_parser{std::move(parser_params)}; + + auto *thd = get_thd(); + const auto *table = get_table(); + const auto *charset = get_charset(); + const auto *srv_data_convert = data_convert_service(); + const auto *srv_data_load = data_load_service(); + + const auto data_size = data_stream.get_data_size(); + const auto se_memory_size = srv_data_load->get_se_memory_size(thd, table); + + std::cout << "Bulk_loader_local::load file_size: " << data_size << std::endl; + std::cout << "Bulk_loader_local::load se_memory_size: " << se_memory_size << std::endl; + + load_ctx = start_session(data_size, se_memory_size, 1); + + if (load_ctx == nullptr) { + return false; + } + + Row_meta row_metadata; + + if (!srv_data_convert->get_row_metadata(thd, table, false, row_metadata)) { + std::cout << "Bulk_loader_local::load srv_data_convert->get_row_metadata failed" << std::endl; + } + + char buffer[100] = ""; + size_t buffer_length = 100; + + Rows_text text_rows{3}; + text_rows.set_num_rows(1); + size_t next_index = 0; + + int idx = 0; + std::string col1 = "1"; + std::string col2 = "2"; + std::string col3 = "3"; + + text_rows.process_columns(0, [&](Column_text &column, bool last_col [[maybe_unused]]) { + switch (idx) { + case 0: + column.m_data_ptr = col1.c_str(); + column.m_data_len = 1; + break; + case 1: + column.m_data_ptr = col2.c_str(); + column.m_data_len = 1; + break; + case 2: + column.m_data_ptr = col3.c_str(); + column.m_data_len = 1; + break; + } + + ++idx; + + return true; + }); + + std::cout << "Bulk_loader_local::load text_rows.get_num_rows: " << text_rows.get_num_rows() << std::endl; + std::cout << "Bulk_loader_local::load text_rows.get_num_cols: " << text_rows.get_num_cols() << std::endl; + + Rows_mysql sql_rows{3}; + Bulk_load_error_location_details error_details; + + auto ret = srv_data_convert->mysql_format(thd, table, text_rows, next_index, buffer, buffer_length, charset, row_metadata, sql_rows, error_details); + + std::cout << "Bulk_loader_local::load srv_data_convert->mysql_format_from_raw ret: " << ret << std::endl; + std::cout << "Bulk_loader_local::load sql_rows.get_num_rows: " << sql_rows.get_num_rows() << std::endl; + std::cout << "Bulk_loader_local::load sql_rows.get_num_cols: " << sql_rows.get_num_cols() << std::endl; + + Bulk_load::Stat_callbacks wait_cbks{ + []() {}, + []() {} + }; + + if (!srv_data_load->load(thd, load_ctx, table, sql_rows, 0, wait_cbks)) { + std::cout << "Bulk_loader_local::load srv_data_load->load failed" << std::endl; + } + + return true; +} + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_local.h b/components/bulk_load/bulk_loader/bulk_loader_local.h new file mode 100644 index 000000000000..aec017c1ed2c --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_local.h @@ -0,0 +1,40 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "bulk_loader_abstract.h" + +namespace Bulk_load { + +template <> +class Bulk_loader_base : public Bulk_loader_impl { + public: + Bulk_loader_base(THD *thd, my_thread_id connection_id, + const TABLE *table, const CHARSET_INFO *charset); + + bool load(size_t &affected_rows) noexcept override; +}; + +using Bulk_loader_local = Bulk_loader_base; + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_s3.cc b/components/bulk_load/bulk_loader/bulk_loader_s3.cc new file mode 100644 index 000000000000..d094eb538b5b --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_s3.cc @@ -0,0 +1,36 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "bulk_loader_s3.h" + +namespace Bulk_load { + +Bulk_loader_base::Bulk_loader_base( + THD *thd, my_thread_id connection_id, const TABLE *table, + const CHARSET_INFO *charset) + : Bulk_loader_impl(thd, connection_id, table, charset) {} + +bool Bulk_loader_s3::load(size_t &affected_rows [[maybe_unused]]) noexcept { + return true; +} + +} // namespace Bulk_load diff --git a/components/bulk_load/bulk_loader/bulk_loader_s3.h b/components/bulk_load/bulk_loader/bulk_loader_s3.h new file mode 100644 index 000000000000..7fda5b89ca1f --- /dev/null +++ b/components/bulk_load/bulk_loader/bulk_loader_s3.h @@ -0,0 +1,40 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "bulk_loader_abstract.h" + +namespace Bulk_load { + +template <> +class Bulk_loader_base : public Bulk_loader_impl { + public: + Bulk_loader_base(THD *thd, my_thread_id connection_id, + const TABLE *table, const CHARSET_INFO *charset); + + bool load(size_t &affected_rows) noexcept override; +}; + +using Bulk_loader_s3 = Bulk_loader_base; + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/data_stream_abstract.h b/components/bulk_load/data_stream/data_stream_abstract.h new file mode 100644 index 000000000000..eed8c7f87a4e --- /dev/null +++ b/components/bulk_load/data_stream/data_stream_abstract.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include + +namespace Bulk_load { + +class DataStreamAbstract { + public: + virtual ~DataStreamAbstract() = default; + + virtual bool open() noexcept = 0; + virtual size_t get_data_size() noexcept = 0; +}; + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/stream_local/data_stream_local.cc b/components/bulk_load/data_stream/stream_local/data_stream_local.cc new file mode 100644 index 000000000000..7926de9382bd --- /dev/null +++ b/components/bulk_load/data_stream/stream_local/data_stream_local.cc @@ -0,0 +1,80 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "data_stream_local.h" + +#include +#include + +#include +#include +#include + +namespace Bulk_load { + +DataStreamLocal::DataStreamLocal(std::filesystem::path path) + : m_file_path{std::move(path)} {} + +DataStreamLocal::~DataStreamLocal() { + m_stream.close(); +} + +bool DataStreamLocal::open() noexcept { + if (m_file_path.empty()) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Empty file name"); + return false; + } + + if (!std::filesystem::exists(m_file_path)) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Cannot find file"); + return false; + } + + const auto file_size = std::filesystem::file_size(m_file_path); + + if (file_size == 0) { + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + "Empty file"); + return false; + } + + m_stream.open(m_file_path, std::ios_base::in | std::ios_base::binary); + + if (!m_stream.is_open()) { + std::stringstream error; + error << "Failed to open file: " << strerror(errno); + LogComponentErr(ERROR_LEVEL, ER_BULK_LOADER_COMPONENT_ERROR, + error.str().c_str()); + return false; + } + + return true; +} + +size_t DataStreamLocal::get_data_size() noexcept { + assert(!m_file_path.empty()); + return std::filesystem::file_size(m_file_path); +} + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/stream_local/data_stream_local.h b/components/bulk_load/data_stream/stream_local/data_stream_local.h new file mode 100644 index 000000000000..50a08147d1da --- /dev/null +++ b/components/bulk_load/data_stream/stream_local/data_stream_local.h @@ -0,0 +1,45 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "components/bulk_load/data_stream/data_stream_abstract.h" + +#include +#include + +namespace Bulk_load { + +class DataStreamLocal : public DataStreamAbstract { + public: + explicit DataStreamLocal(std::filesystem::path path); + ~DataStreamLocal() override; + + bool open() noexcept override; + size_t get_data_size() noexcept override; + + private: + std::filesystem::path m_file_path; + std::ifstream m_stream; +}; + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/stream_s3/data_stream_s3.cc b/components/bulk_load/data_stream/stream_s3/data_stream_s3.cc new file mode 100644 index 000000000000..23396991d899 --- /dev/null +++ b/components/bulk_load/data_stream/stream_s3/data_stream_s3.cc @@ -0,0 +1,29 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include "data_stream_s3.h" + +namespace Bulk_load { + + + +} // namespace Bulk_load diff --git a/components/bulk_load/data_stream/stream_s3/data_stream_s3.h b/components/bulk_load/data_stream/stream_s3/data_stream_s3.h new file mode 100644 index 000000000000..5d142ebe872b --- /dev/null +++ b/components/bulk_load/data_stream/stream_s3/data_stream_s3.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include "components/bulk_load/data_stream/data_stream_abstract.h" + +namespace Bulk_load { + +class DataStreamS3 : public DataStreamAbstract { + +}; + +} // namespace Bulk_load diff --git a/components/bulk_load/stream_parser/parser_params.h b/components/bulk_load/stream_parser/parser_params.h new file mode 100644 index 000000000000..628a4c96879d --- /dev/null +++ b/components/bulk_load/stream_parser/parser_params.h @@ -0,0 +1,49 @@ +/* Copyright (c) 2024, Percona and/or its affiliates. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2.0, + as published by the Free Software Foundation. + + This program is also distributed with certain software (including + but not limited to OpenSSL) that is licensed under separate terms, + as designated in a particular file or component or in included license + documentation. The authors of MySQL hereby grant you an additional + permission to link the program and your derivative works with the + separately licensed software that they have included with MySQL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License, version 2.0, for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#pragma once + +#include +#include +#include + +namespace Bulk_load { + +struct ParserParams { + ParserParams(std::string column_terminator, std::string row_terminator, + uchar escape_char, uchar column_enclose_char, + size_t count_row_skip) + : column_terminator{std::move(column_terminator)}, + row_terminator{std::move(row_terminator)}, + escape_char{escape_char}, + column_enclose_char{column_enclose_char}, + count_row_skip{count_row_skip} {} + std::string column_terminator; + std::string row_terminator; + uchar escape_char; + uchar column_enclose_char; + size_t count_row_skip; +}; + +using ParserParamsPtr = std::unique_ptr; + +} // namespace Bulk_load diff --git a/mysql-test/include/plugin.defs b/mysql-test/include/plugin.defs index ed168b4a773f..4db1cc20062e 100644 --- a/mysql-test/include/plugin.defs +++ b/mysql-test/include/plugin.defs @@ -210,3 +210,4 @@ component_binlog_utils_udf plugin_output_directory no BINLOG_UTILS_UD component_encryption_udf plugin_output_directory no ENCRYPTION_UDF_COMPONENT component_masking_functions plugin_output_directory no MASKING_FUNCTIONS_COMPONENT component_percona_udf plugin_output_directory no PERCONA_UDF_COMPONENT +component_bulk_load plugin_output_directory no BULK_LOAD_COMPONENT diff --git a/mysql-test/suite/component_bulk_load/t/bulk_load-master.opt b/mysql-test/suite/component_bulk_load/t/bulk_load-master.opt new file mode 100644 index 000000000000..c6d7afc0eff5 --- /dev/null +++ b/mysql-test/suite/component_bulk_load/t/bulk_load-master.opt @@ -0,0 +1,2 @@ +--local_infile +--secure-file-priv="" diff --git a/mysql-test/suite/component_bulk_load/t/bulk_load.test b/mysql-test/suite/component_bulk_load/t/bulk_load.test new file mode 100644 index 000000000000..e25a7987a476 --- /dev/null +++ b/mysql-test/suite/component_bulk_load/t/bulk_load.test @@ -0,0 +1,28 @@ +INSTALL COMPONENT 'file://component_bulk_load'; + +--let bulk_load_data_csv=$MYSQL_TMP_DIR/test_data.csv + +write_file $bulk_load_data_csv; +a,b,c +"1","2","3" +"21","22","23" +"31","32","33" +EOF + +--echo ==================================== +--cat_file $bulk_load_data_csv +--echo ==================================== + +CREATE TABLE t ( + a int primary key, + b int, + c int +) engine=innodb; + +--eval LOAD DATA FROM INFILE "$bulk_load_data_csv" INTO TABLE t FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' ALGORITHM = BULK + +select * from t; +drop table t; + +UNINSTALL COMPONENT 'file://component_bulk_load'; +--remove_file $bulk_load_data_csv diff --git a/mysql-test/suite/component_bulk_load/t/suite.opt b/mysql-test/suite/component_bulk_load/t/suite.opt new file mode 100644 index 000000000000..1696ae6ec25f --- /dev/null +++ b/mysql-test/suite/component_bulk_load/t/suite.opt @@ -0,0 +1 @@ +$BULK_LOAD_COMPONENT_OPT