Skip to content

Commit

Permalink
[GLUTEN-7387][CH] Allow parallel downloading in scan operator for hiv…
Browse files Browse the repository at this point in the history
…e text/json table when the whole compresse(not bzip2) file is a single file split (#7598)

* enable parallel downloading for text/json

* wip

* wip

* finish dev

* update version

* update initialization of thread pool

* fix style
  • Loading branch information
taiyang-li authored Nov 14, 2024
1 parent 21b4e65 commit f8a2dca
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20241111
CH_COMMIT=06ffc32a255
CH_COMMIT=3f7e46d4e9e
17 changes: 16 additions & 1 deletion cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Defines.h>
#include <Core/NamesAndTypes.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime64.h>
Expand Down Expand Up @@ -86,6 +87,14 @@ extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TYPE;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
}

namespace ServerSetting
{
extern const ServerSettingsUInt64 max_thread_pool_size;
extern const ServerSettingsUInt64 thread_pool_queue_size;
extern const ServerSettingsUInt64 max_io_thread_pool_size;
extern const ServerSettingsUInt64 io_thread_pool_queue_size;
}
}

namespace local_engine
Expand Down Expand Up @@ -757,6 +766,7 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} value:{}", key, value);
}
}

/// Finally apply some fixed kvs to settings.
settings.set("join_use_nulls", true);
settings.set("input_format_orc_allow_missing_columns", true);
Expand Down Expand Up @@ -970,7 +980,12 @@ void BackendInitializerUtil::initBackend(const SparkConfigs::ConfigMap & spark_c
initCompiledExpressionCache(config);
LOG_INFO(logger, "Init compiled expressions cache factory.");

GlobalThreadPool::initialize();
ServerSettings server_settings;
server_settings.loadSettingsFromConfig(*config);
GlobalThreadPool::initialize(
server_settings[ServerSetting::max_thread_pool_size], 0, server_settings[ServerSetting::thread_pool_queue_size]);
getIOThreadPool().initialize(
server_settings[ServerSetting::max_io_thread_pool_size], 0, server_settings[ServerSetting::io_thread_pool_queue_size]);

const size_t active_parts_loading_threads = config->getUInt("max_active_parts_loading_thread_pool_size", 64);
DB::getActivePartsLoadingThreadPool().initialize(
Expand Down
55 changes: 49 additions & 6 deletions cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <IO/S3/getObjectInfo.h>
#include <IO/S3Common.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/SharedThreadPools.h>
#include <IO/SplittableBzip2ReadBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
Expand Down Expand Up @@ -70,6 +71,9 @@ namespace Setting
extern const SettingsUInt64 s3_max_redirects;
extern const SettingsBool s3_disable_checksum;
extern const SettingsUInt64 s3_retry_attempts;
extern const SettingsMaxThreads max_download_threads;
extern const SettingsUInt64 max_download_buffer_size;
extern const SettingsBool input_format_allow_seeks;
}
namespace ErrorCodes
{
Expand Down Expand Up @@ -183,7 +187,7 @@ adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer> read_buffer, const s
return std::move(read_buffer);

/// Skip text/json files with compression.
/// TODO implement adjustFileReadPosition when compression method is bzip2
/// When the file is compressed, its read range is adjusted in [[buildWithCompressionWrapper]]
Poco::URI file_uri(file_info.uri_file());
DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto");
if (compression != CompressionMethod::None)
Expand Down Expand Up @@ -216,6 +220,8 @@ class LocalFileReadBufferBuilder : public ReadBufferBuilder
explicit LocalFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) { }
~LocalFileReadBufferBuilder() override = default;

bool isRemote() const override { return false; }

std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) override
{
Expand Down Expand Up @@ -660,10 +666,6 @@ void registerReadBufferBuilders()
#endif
}

ReadBufferBuilder::ReadBufferBuilder(DB::ContextPtr context_) : context(context_)
{
}

DB::ReadSettings ReadBufferBuilder::getReadSettings() const
{
DB::ReadSettings read_settings = context->getReadSettings();
Expand All @@ -678,6 +680,10 @@ DB::ReadSettings ReadBufferBuilder::getReadSettings() const
return read_settings;
}

ReadBufferBuilder::ReadBufferBuilder(DB::ContextPtr context_) : context(context_)
{
}

std::unique_ptr<DB::ReadBuffer>
ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
{
Expand Down Expand Up @@ -758,7 +764,11 @@ ReadBufferBuilder::buildWithCompressionWrapper(const substrait::ReadRel::LocalFi
if (compression == CompressionMethod::Bzip2)
return wrapWithBzip2(std::move(in), file_info);
else
return wrapReadBufferWithCompressionMethod(std::move(in), compression);
{
/// In this case we are pretty sure that current split covers the whole file because only bzip2 compression is splittable
auto parallel = wrapWithParallelIfNeeded(std::move(in), file_info);
return wrapReadBufferWithCompressionMethod(std::move(parallel), compression);
}
}

ReadBufferBuilder::ReadBufferCreator ReadBufferBuilder::wrapWithCache(
Expand Down Expand Up @@ -843,6 +853,39 @@ void ReadBufferBuilder::updateCaches(const String & key, size_t last_modified_ti
}
}

std::unique_ptr<DB::ReadBuffer> ReadBufferBuilder::wrapWithParallelIfNeeded(
std::unique_ptr<DB::ReadBuffer> in, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
{
/// Only use parallel downloading for text and json format because data are read serially in those formats.
if (!file_info.has_text() && !file_info.has_json())
return std::move(in);

const auto & settings = context->getSettingsRef();
auto max_download_threads = settings[DB::Setting::max_download_threads];
auto max_download_buffer_size = settings[DB::Setting::max_download_buffer_size];

bool parallel_read = isRemote() && max_download_threads > 1 && isBufferWithFileSize(*in);
if (!parallel_read)
return std::move(in);

size_t file_size = getFileSizeFromReadBuffer(*in);
if (file_size < 4 * max_download_buffer_size)
return std::move(in);

LOG_TRACE(
getLogger("ReadBufferBuilder"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
max_download_threads,
max_download_buffer_size);

return wrapInParallelReadBufferIfSupported(
{std::move(in)},
DB::threadPoolCallbackRunnerUnsafe<void>(DB::getIOThreadPool().get(), "ParallelRead"),
max_download_threads,
max_download_buffer_size,
file_size);
}

ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
{
static ReadBufferBuilderFactory instance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <functional>
#include <memory>
#include <Disks/ObjectStorages/StoredObject.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <substrait/plan.pb.h>
Expand All @@ -35,6 +36,8 @@ class ReadBufferBuilder

virtual ~ReadBufferBuilder() = default;

virtual bool isRemote() const { return true; }

/// build a new read buffer
virtual std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info) = 0;
Expand All @@ -55,7 +58,11 @@ class ReadBufferBuilder
size_t last_modified_time,
size_t file_size);

std::unique_ptr<DB::ReadBuffer>
wrapWithParallelIfNeeded(std::unique_ptr<DB::ReadBuffer> in, const substrait::ReadRel::LocalFiles::FileOrFiles & file_info);

DB::ReadSettings getReadSettings() const;

DB::ContextPtr context;

private:
Expand Down

0 comments on commit f8a2dca

Please sign in to comment.