Skip to content

Commit

Permalink
Merge branch 'master' into function-1
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx authored Feb 2, 2024
2 parents 0f7e4e9 + f55ca1e commit 9f4ff34
Show file tree
Hide file tree
Showing 54 changed files with 992 additions and 331 deletions.
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_rowset_builder.h"
#include "olap/delta_writer.h"
#include "runtime/thread_context.h"

namespace doris {

Expand All @@ -44,6 +45,7 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
}

tasks.emplace_back([writer] {
ThreadLocalHandle::create_thread_local_if_not_exits();
std::lock_guard<bthread::Mutex> lock(writer->_mtx);
if (writer->_is_init || writer->_is_cancelled) {
return Status::OK();
Expand All @@ -57,7 +59,7 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {

Status CloudDeltaWriter::write(const vectorized::Block* block,
const std::vector<uint32_t>& row_idxs, bool is_append) {
if (UNLIKELY(row_idxs.empty() && !is_append)) {
if (row_idxs.empty() && !is_append) [[unlikely]] {
return Status::OK();
}
std::lock_guard lock(_mtx);
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ Status CloudRowsetBuilder::init() {
context.rowset_dir = _tablet->tablet_path();
_rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false));

// TODO(plat1ko):
//_calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token();
_calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token();

RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), true));

Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ Status CloudStorageEngine::open() {
// TODO(plat1ko): Use file cache disks number?
_memtable_flush_executor->init(1);

_calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>();
_calc_delete_bitmap_executor->init();

return Status::OK();
}

Expand Down
9 changes: 6 additions & 3 deletions be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
partition_ids.insert(tablet_writer_it->second->partition_id());
}
if (!partition_ids.empty()) {
RETURN_IF_ERROR(_init_writers_by_parition_ids(partition_ids));
RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
}

return _write_block_data(request, cur_seq, tablet_to_rowidxs, response);
}

Status CloudTabletsChannel::_init_writers_by_parition_ids(
Status CloudTabletsChannel::_init_writers_by_partition_ids(
const std::unordered_set<int64_t>& partition_ids) {
std::vector<CloudDeltaWriter*> writers;
for (auto&& [tablet_id, base_writer] : _tablet_writers) {
Expand Down Expand Up @@ -129,7 +129,10 @@ Status CloudTabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlo

for (auto&& [tablet_id, base_writer] : _tablet_writers) {
auto* writer = static_cast<CloudDeltaWriter*>(base_writer.get());
if (_partition_ids.contains(writer->partition_id())) {
// ATTN: the strict mode means strict filtering of column type conversions during import.
// Sometimes all inputs are filtered, but the partition ID is still set, and the writer is
// not initialized.
if (_partition_ids.contains(writer->partition_id()) && writer->is_init()) {
if (!success) { // Already failed, cancel all remain writers
static_cast<void>(writer->cancel());
continue;
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CloudTabletsChannel final : public BaseTabletsChannel {
PTabletWriterAddBlockResult* res, bool* finished) override;

private:
Status _init_writers_by_parition_ids(const std::unordered_set<int64_t>& partition_ids);
Status _init_writers_by_partition_ids(const std::unordered_set<int64_t>& partition_ids);

CloudStorageEngine& _engine;
};
Expand Down
5 changes: 4 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ DEFINE_Int32(memory_max_alignment, "16");
// memtable insert memory tracker will multiply input block size with this ratio
DEFINE_mDouble(memtable_insert_memory_ratio, "1.4");
// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "104857600");
DEFINE_mInt64(write_buffer_size, "209715200");
// max buffer size used in memtable for the aggregated table, default 400MB
DEFINE_mInt64(write_buffer_size_for_agg, "419430400");
// max parallel flush task per memtable writer
Expand Down Expand Up @@ -804,6 +804,9 @@ DEFINE_mInt32(segment_compression_threshold_kb, "256");
// The connection timeout when connecting to external table such as odbc table.
DEFINE_mInt32(external_table_connect_timeout_sec, "30");

// Time to clean up useless JDBC connection pool cache
DEFINE_mInt32(jdbc_connection_pool_cache_clear_time_sec, "28800");

// Global bitmap cache capacity for aggregation cache, size in bytes
DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600");
DEFINE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec, "1800");
Expand Down
5 changes: 4 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ DECLARE_Int32(memory_max_alignment);

// memtable insert memory tracker will multiply input block size with this ratio
DECLARE_mDouble(memtable_insert_memory_ratio);
// max write buffer size before flush, default 100MB
// max write buffer size before flush, default 200MB
DECLARE_mInt64(write_buffer_size);
// max buffer size used in memtable for the aggregated table, default 400MB
DECLARE_mInt64(write_buffer_size_for_agg);
Expand Down Expand Up @@ -858,6 +858,9 @@ DECLARE_mInt32(segment_compression_threshold_kb);
// The connection timeout when connecting to external table such as odbc table.
DECLARE_mInt32(external_table_connect_timeout_sec);

// Time to clean up useless JDBC connection pool cache
DECLARE_mInt32(jdbc_connection_pool_cache_clear_time_sec);

// Global bitmap cache capacity for aggregation cache, size in bytes
DECLARE_Int64(delete_bitmap_agg_cache_capacity);
DECLARE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec);
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,

int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
if (ref_block->get_by_position(result_column_id).column == nullptr) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} result column is nullptr",
ref_block->get_by_position(result_column_id).name);
}
ref_block->replace_by_position_if_const(result_column_id);

if (ref_block->get_by_position(result_column_id).column->size() != row_size) {
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_stop_background_threads_latch(1),
_tablet_manager(new TabletManager(*this, config::tablet_map_shard_size)),
_txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)),
_calc_delete_bitmap_executor(nullptr),
_default_rowset_type(BETA_ROWSET),
_stream_load_recorder(nullptr),
_create_tablet_idx_lru_cache(
Expand Down
10 changes: 4 additions & 6 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class BaseStorageEngine {
RowsetId next_rowset_id();

MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); }
CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
return _calc_delete_bitmap_executor.get();
}

const std::shared_ptr<MemTracker>& segment_meta_mem_tracker() {
return _segment_meta_mem_tracker;
Expand All @@ -136,6 +139,7 @@ class BaseStorageEngine {

std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;

// This mem tracker is only for tracking memory use by segment meta data such as footer or index page.
// The memory consumed by querying is tracked in segment iterator.
Expand Down Expand Up @@ -203,10 +207,6 @@ class StorageEngine final : public BaseStorageEngine {
TxnManager* txn_manager() { return _txn_manager.get(); }
SnapshotManager* snapshot_mgr() { return _snapshot_mgr.get(); }
MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); }
CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
return _calc_delete_bitmap_executor.get();
}

// Rowset garbage collection helpers
bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id);
PendingRowsetSet& pending_local_rowsets() { return _pending_local_rowsets; }
Expand Down Expand Up @@ -439,8 +439,6 @@ class StorageEngine final : public BaseStorageEngine {
std::unique_ptr<TabletManager> _tablet_manager;
std::unique_ptr<TxnManager> _txn_manager;

std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;

// Used to control the migration from segment_v1 to segment_v2, can be deleted in futrue.
// Type of new loaded data
RowsetTypePB _default_rowset_type;
Expand Down
25 changes: 20 additions & 5 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,30 @@ Status PipelineTask::execute(bool* eos) {
if (!_opened) {
{
SCOPED_RAW_TIMER(&time_spent);
auto st = _open();
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
// if _open_status is not ok, could know have execute open function,
// now execute open again, so need excluding PIP_WAIT_FOR_RF and PIP_WAIT_FOR_SC error out.
if (!_open_status.ok() && !_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>() &&
!_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
return _open_status;
}
// here execute open and not check dependency(eg: the second start rpc arrival)
// so if open have some error, and return error status directly, the query will be cancel.
// and then the rpc arrival will not found the query as have been canceled and remove.
_open_status = _open();
if (_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
return Status::OK();
} else if (st.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
} else if (_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
return Status::OK();
}
RETURN_IF_ERROR(st);
//if status is not ok, and have dependency to push back to queue again.
if (!_open_status.ok() && has_dependency()) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
}
// if not ok and no dependency, return error to cancel.
RETURN_IF_ERROR(_open_status);
}
if (has_dependency()) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
Expand Down Expand Up @@ -297,7 +312,7 @@ Status PipelineTask::execute(bool* eos) {
}
}
}
if (*eos) { // now only join node have add_dependency, and join probe could start when the join sink is eos
if (*eos) { // now only join node/set operation node have add_dependency, and join probe could start when the join sink is eos
_finish_p_dependency();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class PipelineTask {
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;
int _core_id = 0;
Status _open_status = Status::OK();

RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
Expand Down
33 changes: 18 additions & 15 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ std::string ODBCTableDescriptor::debug_string() const {

JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc)
: TableDescriptor(tdesc),
_jdbc_catalog_id(tdesc.jdbcTable.catalog_id),
_jdbc_resource_name(tdesc.jdbcTable.jdbc_resource_name),
_jdbc_driver_url(tdesc.jdbcTable.jdbc_driver_url),
_jdbc_driver_class(tdesc.jdbcTable.jdbc_driver_class),
Expand All @@ -266,24 +267,26 @@ JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc)
_jdbc_table_name(tdesc.jdbcTable.jdbc_table_name),
_jdbc_user(tdesc.jdbcTable.jdbc_user),
_jdbc_passwd(tdesc.jdbcTable.jdbc_password),
_jdbc_min_pool_size(tdesc.jdbcTable.jdbc_min_pool_size),
_jdbc_max_pool_size(tdesc.jdbcTable.jdbc_max_pool_size),
_jdbc_max_idle_time(tdesc.jdbcTable.jdbc_max_idle_time),
_jdbc_max_wait_time(tdesc.jdbcTable.jdbc_max_wait_time),
_jdbc_keep_alive(tdesc.jdbcTable.jdbc_keep_alive) {}
_connection_pool_min_size(tdesc.jdbcTable.connection_pool_min_size),
_connection_pool_max_size(tdesc.jdbcTable.connection_pool_max_size),
_connection_pool_max_wait_time(tdesc.jdbcTable.connection_pool_max_wait_time),
_connection_pool_max_life_time(tdesc.jdbcTable.connection_pool_max_life_time),
_connection_pool_keep_alive(tdesc.jdbcTable.connection_pool_keep_alive) {}

std::string JdbcTableDescriptor::debug_string() const {
fmt::memory_buffer buf;
fmt::format_to(buf,
"JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} "
",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} "
",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} ,_jdbc_min_pool_size={} "
",_jdbc_max_pool_size={} ,_jdbc_max_idle_time={} ,_jdbc_max_wait_time={} "
",_jdbc_keep_alive={})",
TableDescriptor::debug_string(), _jdbc_resource_name, _jdbc_driver_url,
_jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, _jdbc_table_name,
_jdbc_user, _jdbc_passwd, _jdbc_min_pool_size, _jdbc_max_pool_size,
_jdbc_max_idle_time, _jdbc_max_wait_time, _jdbc_keep_alive);
fmt::format_to(
buf,
"JDBCTable({} ,_jdbc_catalog_id = {}, _jdbc_resource_name={} ,_jdbc_driver_url={} "
",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} "
",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} ,_connection_pool_min_size={} "
",_connection_pool_max_size={} ,_connection_pool_max_wait_time={} "
",_connection_pool_max_life_time={} ,_connection_pool_keep_alive={})",
TableDescriptor::debug_string(), _jdbc_catalog_id, _jdbc_resource_name,
_jdbc_driver_url, _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url,
_jdbc_table_name, _jdbc_user, _jdbc_passwd, _connection_pool_min_size,
_connection_pool_max_size, _connection_pool_max_wait_time,
_connection_pool_max_life_time, _connection_pool_keep_alive);
return fmt::to_string(buf);
}

Expand Down
22 changes: 12 additions & 10 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ class JdbcTableDescriptor : public TableDescriptor {
public:
JdbcTableDescriptor(const TTableDescriptor& tdesc);
std::string debug_string() const override;
int64_t jdbc_catalog_id() const { return _jdbc_catalog_id; }
const std::string& jdbc_resource_name() const { return _jdbc_resource_name; }
const std::string& jdbc_driver_url() const { return _jdbc_driver_url; }
const std::string& jdbc_driver_class() const { return _jdbc_driver_class; }
Expand All @@ -318,13 +319,14 @@ class JdbcTableDescriptor : public TableDescriptor {
const std::string& jdbc_table_name() const { return _jdbc_table_name; }
const std::string& jdbc_user() const { return _jdbc_user; }
const std::string& jdbc_passwd() const { return _jdbc_passwd; }
int32_t jdbc_min_pool_size() const { return _jdbc_min_pool_size; }
int32_t jdbc_max_pool_size() const { return _jdbc_max_pool_size; }
int32_t jdbc_max_idle_time() const { return _jdbc_max_idle_time; }
int32_t jdbc_max_wait_time() const { return _jdbc_max_wait_time; }
bool jdbc_keep_alive() const { return _jdbc_keep_alive; }
int32_t connection_pool_min_size() const { return _connection_pool_min_size; }
int32_t connection_pool_max_size() const { return _connection_pool_max_size; }
int32_t connection_pool_max_wait_time() const { return _connection_pool_max_wait_time; }
int32_t connection_pool_max_life_time() const { return _connection_pool_max_life_time; }
bool connection_pool_keep_alive() const { return _connection_pool_keep_alive; }

private:
int64_t _jdbc_catalog_id;
std::string _jdbc_resource_name;
std::string _jdbc_driver_url;
std::string _jdbc_driver_class;
Expand All @@ -333,11 +335,11 @@ class JdbcTableDescriptor : public TableDescriptor {
std::string _jdbc_table_name;
std::string _jdbc_user;
std::string _jdbc_passwd;
int32_t _jdbc_min_pool_size;
int32_t _jdbc_max_pool_size;
int32_t _jdbc_max_idle_time;
int32_t _jdbc_max_wait_time;
bool _jdbc_keep_alive;
int32_t _connection_pool_min_size;
int32_t _connection_pool_max_size;
int32_t _connection_pool_max_wait_time;
int32_t _connection_pool_max_life_time;
bool _connection_pool_keep_alive;
};

class TupleDescriptor {
Expand Down
9 changes: 6 additions & 3 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,13 @@ void BaseTabletsChannel::_build_tablet_to_rowidxs(
// tests show that a relatively coarse-grained read lock here performs better under multicore scenario
// see: https://github.com/apache/doris/pull/28552
std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
if (request.is_single_tablet_block()) {
// The cloud mode need the tablet ids to prepare rowsets.
int64_t tablet_id = request.tablet_ids(0);
tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {0});
return;
}
for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) {
if (request.is_single_tablet_block()) {
break;
}
int64_t tablet_id = request.tablet_ids(i);
if (_is_broken_tablet(tablet_id)) {
// skip broken tablets
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ void Block::filter_block_internal(Block* block, const std::vector<uint32_t>& col
const auto result_size = column->assume_mutable()->filter(filter);
if (result_size != count) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"result_size not euqal with filter_size, result_size={}, "
"result_size not equal with filter_size, result_size={}, "
"filter_size={}",
result_size, count);
}
Expand Down
11 changes: 5 additions & 6 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,10 @@ Status JniConnector::close() {
Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) {
RETURN_IF_ERROR(
JniUtil::get_jni_scanner_class(env, _connector_class.c_str(), &_jni_scanner_cls));
if (_jni_scanner_cls == NULL) {
if (env->ExceptionOccurred()) env->ExceptionDescribe();
if (_jni_scanner_cls == nullptr) {
if (env->ExceptionOccurred()) {
env->ExceptionDescribe();
}
return Status::InternalError("Fail to get JniScanner class.");
}
RETURN_ERROR_IF_EXC(env);
Expand Down Expand Up @@ -249,13 +251,10 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) {

Status JniConnector::fill_block(Block* block, const ColumnNumbers& arguments, long table_address) {
if (table_address == 0) {
return Status::OK();
return Status::InternalError("table_address is 0");
}
TableMetaAddress table_meta(table_address);
long num_rows = table_meta.next_meta_as_long();
if (num_rows == 0) {
return Status::OK();
}
for (size_t i : arguments) {
if (block->get_by_position(i).column == nullptr) {
auto return_type = block->get_data_type(i);
Expand Down
Loading

0 comments on commit 9f4ff34

Please sign in to comment.