Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
gdliu3 committed Jul 29, 2024
1 parent ab7d7a8 commit 98f27de
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 26 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._output_vexpr_ctxs, *input_block, &block));
std::shared_ptr<arrow::Schema> block_arrow_schema;
// After expr executed, use recaculated schema as final schema
RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema));
RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema, state->timezone()));
RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(),
&result, _timezone_obj));
local_state._queue->blocking_put(result);
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "result_sink_operator.h"

#include <sys/select.h>

#include <memory>
#include <utility>

Expand Down Expand Up @@ -79,7 +81,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema,
state->timezone()));
if (state->query_options().enable_parallel_result_sink) {
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema);
} else {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
return false;
}
auto wg_ptr = _wg_wptr.lock();
if (!wg_ptr) {
if (wg_ptr) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
_limiter_tracker_raw->release(size); // rollback
return false;
Expand Down
37 changes: 21 additions & 16 deletions be/src/util/arrow/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ namespace doris {

using strings::Substitute;

Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result) {
Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result,
const std::string& timezone) {
switch (type.type) {
case TYPE_NULL:
*result = arrow::null();
Expand Down Expand Up @@ -96,11 +97,11 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
break;
case TYPE_DATETIMEV2:
if (type.scale > 3) {
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
} else if (type.scale > 0) {
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI);
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI, timezone);
} else {
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND, timezone);
}
break;
case TYPE_DECIMALV2:
Expand All @@ -120,16 +121,16 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
case TYPE_ARRAY: {
DCHECK_EQ(type.children.size(), 1);
std::shared_ptr<arrow::DataType> item_type;
static_cast<void>(convert_to_arrow_type(type.children[0], &item_type));
static_cast<void>(convert_to_arrow_type(type.children[0], &item_type, timezone));
*result = std::make_shared<arrow::ListType>(item_type);
break;
}
case TYPE_MAP: {
DCHECK_EQ(type.children.size(), 2);
std::shared_ptr<arrow::DataType> key_type;
std::shared_ptr<arrow::DataType> val_type;
static_cast<void>(convert_to_arrow_type(type.children[0], &key_type));
static_cast<void>(convert_to_arrow_type(type.children[1], &val_type));
static_cast<void>(convert_to_arrow_type(type.children[0], &key_type, timezone));
static_cast<void>(convert_to_arrow_type(type.children[1], &val_type, timezone));
*result = std::make_shared<arrow::MapType>(key_type, val_type);
break;
}
Expand All @@ -138,7 +139,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
std::vector<std::shared_ptr<arrow::Field>> fields;
for (size_t i = 0; i < type.children.size(); i++) {
std::shared_ptr<arrow::DataType> field_type;
static_cast<void>(convert_to_arrow_type(type.children[i], &field_type));
static_cast<void>(convert_to_arrow_type(type.children[i], &field_type, timezone));
fields.push_back(std::make_shared<arrow::Field>(type.field_names[i], field_type,
type.contains_nulls[i]));
}
Expand All @@ -156,20 +157,22 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
return Status::OK();
}

Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr<arrow::Field>* field) {
Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr<arrow::Field>* field,
const std::string& timezone) {
std::shared_ptr<arrow::DataType> type;
RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type));
RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type, timezone));
*field = arrow::field(desc->col_name(), type, desc->is_nullable());
return Status::OK();
}

Status convert_block_arrow_schema(const vectorized::Block& block,
std::shared_ptr<arrow::Schema>* result) {
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (const auto& type_and_name : block) {
std::shared_ptr<arrow::DataType> arrow_type;
RETURN_IF_ERROR(convert_to_arrow_type(type_and_name.type->get_type_as_type_descriptor(),
&arrow_type));
&arrow_type, timezone));
fields.push_back(std::make_shared<arrow::Field>(type_and_name.name, arrow_type,
type_and_name.type->is_nullable()));
}
Expand All @@ -178,12 +181,13 @@ Status convert_block_arrow_schema(const vectorized::Block& block,
}

Status convert_to_arrow_schema(const RowDescriptor& row_desc,
std::shared_ptr<arrow::Schema>* result) {
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto tuple_desc : row_desc.tuple_descriptors()) {
for (auto desc : tuple_desc->slots()) {
std::shared_ptr<arrow::Field> field;
RETURN_IF_ERROR(convert_to_arrow_field(desc, &field));
RETURN_IF_ERROR(convert_to_arrow_field(desc, &field, timezone));
fields.push_back(field);
}
}
Expand All @@ -192,12 +196,13 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,
}

Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
std::shared_ptr<arrow::Schema>* result) {
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (int i = 0; i < output_vexpr_ctxs.size(); i++) {
std::shared_ptr<arrow::DataType> arrow_type;
auto root_expr = output_vexpr_ctxs.at(i)->root();
RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type));
RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type, timezone));
auto field_name = root_expr->is_slot_ref() && !root_expr->expr_label().empty()
? root_expr->expr_label()
: fmt::format("{}_{}", root_expr->data_type()->get_name(), i);
Expand Down
11 changes: 7 additions & 4 deletions be/src/util/arrow/row_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ namespace doris {

class RowDescriptor;

Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result);
Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result,
const std::string& timezone);

// Convert Doris RowDescriptor to Arrow Schema.
Status convert_to_arrow_schema(const RowDescriptor& row_desc,
std::shared_ptr<arrow::Schema>* result);
std::shared_ptr<arrow::Schema>* result, const std::string& timezone);

Status convert_block_arrow_schema(const vectorized::Block& block,
std::shared_ptr<arrow::Schema>* result);
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone);

Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
std::shared_ptr<arrow::Schema>* result);
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone);

Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result);

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/runtime/vparquet_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ Status VParquetTransformer::_parse_schema() {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
std::shared_ptr<arrow::DataType> type;
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type));
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type,
_state->timezone()));
if (_parquet_schemas != nullptr) {
std::shared_ptr<arrow::Field> field =
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
Expand Down
4 changes: 2 additions & 2 deletions be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ void serialize_and_deserialize_arrow_test() {
RowDescriptor row_desc(&tuple_desc, true);
// arrow schema
std::shared_ptr<arrow::Schema> _arrow_schema;
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK());

// serialize
std::shared_ptr<arrow::RecordBatch> result;
Expand Down Expand Up @@ -623,7 +623,7 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
RowDescriptor row_desc(&tuple_desc, true);
// arrow schema
std::shared_ptr<arrow::Schema> _arrow_schema;
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK());

// serialize
std::shared_ptr<arrow::RecordBatch> result;
Expand Down
4 changes: 4 additions & 0 deletions regression-test/data/arrow_flight_sql_p0/test_select.out
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
-- !arrow_flight_sql --
777 4

-- !arrow_flight_sql_datetime --
333 plsql333 2024-07-21 04:00:00.123456 2024-07-21 04:00:00.0
222 plsql222 2024-07-20 04:00:00.123456 2024-07-20 04:00:00.0
111 plsql111 2024-07-19 04:00:00.123456 2024-07-19 04:00:00.0
12 changes: 12 additions & 0 deletions regression-test/suites/arrow_flight_sql_p0/test_select.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,16 @@ suite("test_select", "arrow_flight_sql") {
sql """INSERT INTO ${tableName} VALUES(111, "plsql333")"""

qt_arrow_flight_sql "select sum(id) as a, count(1) as b from ${tableName}"

tableName = "test_select_datetime"
sql "DROP TABLE IF EXISTS ${tableName}"
sql """
create table ${tableName} (id int, name varchar(20), f_datetime_p datetime(6), f_datetime datetime) DUPLICATE key(`id`) distributed by hash (`id`) buckets 4
properties ("replication_num"="1");
"""
sql """INSERT INTO ${tableName} VALUES(111, "plsql111","2024-07-19 12:00:00.123456","2024-07-19 12:00:00")"""
sql """INSERT INTO ${tableName} VALUES(222, "plsql222","2024-07-20 12:00:00.123456","2024-07-20 12:00:00")"""
sql """INSERT INTO ${tableName} VALUES(333, "plsql333","2024-07-21 12:00:00.123456","2024-07-21 12:00:00")"""

qt_arrow_flight_sql_datetime "select * from ${tableName} order by id desc"
}

0 comments on commit 98f27de

Please sign in to comment.