Skip to content

Commit

Permalink
assign varlen_resource correctly for aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Nov 8, 2024
1 parent 975d9e4 commit 5a7c42a
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 28 deletions.
9 changes: 8 additions & 1 deletion src/jogasaki/executor/exchange/aggregate/input_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ bool input_partition::write(accessor::record_ref record) {
for(std::size_t i=0, n = info.aggregator_specs().size(); i < n; ++i) {
auto& as = info.aggregator_specs()[i];
auto& aggregator = as.aggregator_info().aggregator();
aggregator(value, info.target_field_locator(i), initial, record, info.source_field_locators(i));
aggregator(
value,
info.target_field_locator(i),
initial,
record,
info.source_field_locators(i),
values_->varlen_resource()
);
}
if (hash_table_->load_factor() > load_factor_bound) {
flush();
Expand Down
15 changes: 12 additions & 3 deletions src/jogasaki/executor/exchange/aggregate/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ reader::reader(
key_comparator_(info_->mid().key_compare_info()),
pointer_field_offset_(
info_->mid().group_meta()->key().value_offset(info_->mid().group_meta()->key().field_count()-1)
)
),
varlen_resource_(std::make_unique<memory::monotonic_paged_memory_resource>(std::addressof(global::page_pool())))
{
for(auto& p : partitions_) {
if (!p) continue;
Expand Down Expand Up @@ -93,7 +94,8 @@ bool reader::next_group() {
info.target_field_locator(i),
initial,
src,
sequence_view{&info.target_field_locator(i)}
sequence_view{&info.target_field_locator(i)},
varlen_resource_.get()
);
}
initial = false;
Expand Down Expand Up @@ -142,7 +144,14 @@ accessor::record_ref reader::get_member() const {
for(std::size_t i=0, n=info.aggregator_specs().size(); i < n; ++i) {
auto& as = info.aggregator_specs()[i];
auto& aggregator = as.aggregator_info().aggregator();
aggregator(target, info.target_field_locator(i), false, ref, info.source_field_locators(i));
aggregator(
target,
info.target_field_locator(i),
false,
ref,
info.source_field_locators(i),
varlen_resource_.get()
);
}
return target;
}
Expand Down
1 change: 1 addition & 0 deletions src/jogasaki/executor/exchange/aggregate/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class cache_align reader : public io::group_reader {
std::size_t pointer_field_offset_{};
std::vector<std::vector<field_locator>> args_{};
bool internal_on_member_{};
std::unique_ptr<memory::paged_memory_resource> varlen_resource_{};

bool internal_next_member();
[[nodiscard]] accessor::record_ref internal_get_member() const;
Expand Down
3 changes: 2 additions & 1 deletion src/jogasaki/executor/function/incremental/aggregator_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ using aggregator_type = std::function<void (
field_locator const&,
bool,
accessor::record_ref,
sequence_view<field_locator const>
sequence_view<field_locator const>,
memory::paged_memory_resource* varlen_resource // used if aggregator creates new varlen data
)>;

/**
Expand Down
45 changes: 30 additions & 15 deletions src/jogasaki/executor/function/incremental/builtin_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,8 @@ void sum(
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* // `sum` does not create new varlen data
) {
BOOST_ASSERT(args.size() == 1); //NOLINT
auto& arg_type = args[0].type();
Expand Down Expand Up @@ -693,7 +694,8 @@ void count_pre(
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* // `count` does not create new varlen data
) {
BOOST_ASSERT(args.size() == 1); //NOLINT
BOOST_ASSERT(target_loc.type().kind() == kind::int8); //NOLINT
Expand All @@ -713,7 +715,8 @@ void count_mid(
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* // `count` does not create new varlen data
) {
BOOST_ASSERT(args.size() == 1); //NOLINT
BOOST_ASSERT(args[0].type().kind() == kind::int8); //NOLINT
Expand All @@ -736,7 +739,8 @@ void count_rows_pre(
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* // `count` does not create new varlen data
) {
BOOST_ASSERT(args.size() == 0); //NOLINT
BOOST_ASSERT(target_loc.type().kind() == kind::int8); //NOLINT
Expand Down Expand Up @@ -769,7 +773,8 @@ void avg_post(
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* // `avg` does not create new varlen data
) {
BOOST_ASSERT(args.size() == 2); //NOLINT
(void)initial;
Expand Down Expand Up @@ -806,13 +811,13 @@ T max_or_min(bool max, T a, T b) {
return max ? a : b;
}


void max(
accessor::record_ref target,
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* varlen_resource // `max` needs to remember the maximum value
) {
BOOST_ASSERT(args.size() == 1); //NOLINT
auto& arg_type = args[0].type();
Expand All @@ -829,7 +834,8 @@ void max(
target_nullity_offset,
source,
arg_offset,
src_nullity_offset
src_nullity_offset,
varlen_resource // copy upstream process varlen data to exchange's varlen_resource
);
return;
}
Expand All @@ -841,8 +847,11 @@ void max(
case kind::int8: target.set_value<runtime_t<kind::int8>>(target_offset, max_or_min(true, target.get_value<runtime_t<kind::int8>>(target_offset), source.get_value<runtime_t<kind::int8>>(arg_offset))); break;
case kind::float4: target.set_value<runtime_t<kind::float4>>(target_offset, max_or_min(true, target.get_value<runtime_t<kind::float4>>(target_offset), source.get_value<runtime_t<kind::float4>>(arg_offset))); break;
case kind::float8: target.set_value<runtime_t<kind::float8>>(target_offset, max_or_min(true, target.get_value<runtime_t<kind::float8>>(target_offset), source.get_value<runtime_t<kind::float8>>(arg_offset))); break;
case kind::character: target.set_value<runtime_t<kind::character>>(target_offset, max_or_min(true, target.get_value<runtime_t<kind::character>>(target_offset), source.get_value<runtime_t<kind::character>>(arg_offset))); break;
case kind::octet: target.set_value<runtime_t<kind::octet>>(target_offset, max_or_min(true, target.get_value<runtime_t<kind::octet>>(target_offset), source.get_value<runtime_t<kind::octet>>(arg_offset))); break;

// character and octet uses varlen_resource to copy varlen data owned by upstream process
case kind::character: target.set_value<runtime_t<kind::character>>(target_offset, accessor::text{varlen_resource, max_or_min(true, target.get_value<runtime_t<kind::character>>(target_offset), source.get_value<runtime_t<kind::character>>(arg_offset))}); break;
case kind::octet: target.set_value<runtime_t<kind::octet>>(target_offset, accessor::binary{varlen_resource, max_or_min(true, target.get_value<runtime_t<kind::octet>>(target_offset), source.get_value<runtime_t<kind::octet>>(arg_offset))}); break;

case kind::decimal: target.set_value<runtime_t<kind::decimal>>(target_offset, max_or_min(true, target.get_value<runtime_t<kind::decimal>>(target_offset), source.get_value<runtime_t<kind::decimal>>(arg_offset))); break;
case kind::date: target.set_value<runtime_t<kind::date>>(target_offset, max_or_min(true, target.get_value<runtime_t<kind::date>>(target_offset), source.get_value<runtime_t<kind::date>>(arg_offset))); break;
case kind::time_of_day: target.set_value<runtime_t<kind::time_of_day>>(target_offset, max_or_min(true, target.get_value<runtime_t<kind::time_of_day>>(target_offset), source.get_value<runtime_t<kind::time_of_day>>(arg_offset))); break;
Expand All @@ -856,7 +865,8 @@ void min(
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* varlen_resource // `min` needs to remember the minimum value
) {
BOOST_ASSERT(args.size() == 1); //NOLINT
auto& arg_type = args[0].type();
Expand All @@ -873,7 +883,8 @@ void min(
target_nullity_offset,
source,
arg_offset,
src_nullity_offset
src_nullity_offset,
varlen_resource // copy upstream process varlen data to exchange's varlen_resource
);
return;
}
Expand All @@ -885,8 +896,11 @@ void min(
case kind::int8: target.set_value<runtime_t<kind::int8>>(target_offset, max_or_min(false, target.get_value<runtime_t<kind::int8>>(target_offset), source.get_value<runtime_t<kind::int8>>(arg_offset))); break;
case kind::float4: target.set_value<runtime_t<kind::float4>>(target_offset, max_or_min(false, target.get_value<runtime_t<kind::float4>>(target_offset), source.get_value<runtime_t<kind::float4>>(arg_offset))); break;
case kind::float8: target.set_value<runtime_t<kind::float8>>(target_offset, max_or_min(false, target.get_value<runtime_t<kind::float8>>(target_offset), source.get_value<runtime_t<kind::float8>>(arg_offset))); break;
case kind::character: target.set_value<runtime_t<kind::character>>(target_offset, max_or_min(false, target.get_value<runtime_t<kind::character>>(target_offset), source.get_value<runtime_t<kind::character>>(arg_offset))); break;
case kind::octet: target.set_value<runtime_t<kind::octet>>(target_offset, max_or_min(false, target.get_value<runtime_t<kind::octet>>(target_offset), source.get_value<runtime_t<kind::octet>>(arg_offset))); break;

// character and octet uses varlen_resource to copy varlen data owned by upstream process
case kind::character: target.set_value<runtime_t<kind::character>>(target_offset, accessor::text{varlen_resource, max_or_min(false, target.get_value<runtime_t<kind::character>>(target_offset), source.get_value<runtime_t<kind::character>>(arg_offset))}); break;
case kind::octet: target.set_value<runtime_t<kind::octet>>(target_offset, accessor::binary{varlen_resource, max_or_min(false, target.get_value<runtime_t<kind::octet>>(target_offset), source.get_value<runtime_t<kind::octet>>(arg_offset))}); break;

case kind::decimal: target.set_value<runtime_t<kind::decimal>>(target_offset, max_or_min(false, target.get_value<runtime_t<kind::decimal>>(target_offset), source.get_value<runtime_t<kind::decimal>>(arg_offset))); break;
case kind::date: target.set_value<runtime_t<kind::date>>(target_offset, max_or_min(false, target.get_value<runtime_t<kind::date>>(target_offset), source.get_value<runtime_t<kind::date>>(arg_offset))); break;
case kind::time_of_day: target.set_value<runtime_t<kind::time_of_day>>(target_offset, max_or_min(false, target.get_value<runtime_t<kind::time_of_day>>(target_offset), source.get_value<runtime_t<kind::time_of_day>>(arg_offset))); break;
Expand All @@ -900,7 +914,8 @@ void identity_post(
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* // assuming record is already copied to the exchange's varlen_resource in pre/mid
) {
BOOST_ASSERT(args.size() == 1); //NOLINT
(void)initial;
Expand Down
24 changes: 16 additions & 8 deletions src/jogasaki/executor/function/incremental/builtin_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,63 +43,71 @@ void sum(
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource*
);

void count_pre(
accessor::record_ref target,
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource*
);

void count_mid(
accessor::record_ref target,
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource*
);

void count_rows_pre(
accessor::record_ref target,
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource*
);

void avg_post(
accessor::record_ref target,
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource*
);

void max(
accessor::record_ref target,
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* varlen_resource
);

void min(
accessor::record_ref target,
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* varlen_resource
);

void identity_post(
accessor::record_ref target,
field_locator const& target_loc,
bool initial,
accessor::record_ref source,
sequence_view<field_locator const> args
sequence_view<field_locator const> args,
memory::paged_memory_resource* varlen_resource
);
} // namespace builtin

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ TEST_F(sql_function_type_matrix_test, count_distinct_char) {
test_function_with_type<kind::int8>("count(distinct ", "CHAR(3)", "('AAA'),('BBB'),('AAA')", 2);
}

TEST_F(sql_function_type_matrix_test, count_distinct_char_20) {
test_function_with_type<kind::int8>("count(distinct ", "CHAR(20)", "('AAA'),('BBB'),('AAA')", 2);
}

TEST_F(sql_function_type_matrix_test, count_distinct_varbinary) {
test_function_with_type<kind::int8>("count(distinct ", "VARBINARY(3)", "('010101'),('020202'),('010101')", 2);
}
Expand All @@ -92,6 +96,10 @@ TEST_F(sql_function_type_matrix_test, count_distinct_binary) {
test_function_with_type<kind::int8>("count(distinct ", "BINARY(3)", "('010101'),('020202'),('010101')", 2);
}

TEST_F(sql_function_type_matrix_test, count_distinct_binary_20) {
test_function_with_type<kind::int8>("count(distinct ", "BINARY(20)", "('010101'),('020202'),('010101')", 2);
}

TEST_F(sql_function_type_matrix_test, count_distinct_date) {
test_function_with_type<kind::int8>("count(distinct ", "DATE", "(DATE'2000-01-01'),(DATE'2000-01-02'),(DATE'2000-01-01')", 2);
}
Expand Down
8 changes: 8 additions & 0 deletions test/jogasaki/api/sql_function_type_matrix_count_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ TEST_F(sql_function_type_matrix_test, count_char) {
test_function_with_type<kind::int8>("count(", "CHAR(3)", "('AAA'),('BBB'),('CCC')", 3);
}

TEST_F(sql_function_type_matrix_test, count_char_20) {
test_function_with_type<kind::int8>("count(", "CHAR(20)", "('AAA'),('BBB'),('CCC')", 3);
}

TEST_F(sql_function_type_matrix_test, count_varbinary) {
test_function_with_type<kind::int8>("count(", "VARBINARY(3)", "('010101'),('020202'),('030303')", 3);
}
Expand All @@ -88,6 +92,10 @@ TEST_F(sql_function_type_matrix_test, count_binary) {
test_function_with_type<kind::int8>("count(", "BINARY(3)", "('010101'),('020202'),('030303')", 3);
}

TEST_F(sql_function_type_matrix_test, count_binary_20) {
test_function_with_type<kind::int8>("count(", "BINARY(20)", "('010101'),('020202'),('030303')", 3);
}

TEST_F(sql_function_type_matrix_test, count_date) {
test_function_with_type<kind::int8>("count(", "DATE", "(DATE'2000-01-01'),(DATE'2000-01-02'),(DATE'2000-01-03')", 3);
}
Expand Down
8 changes: 8 additions & 0 deletions test/jogasaki/api/sql_function_type_matrix_max_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ TEST_F(sql_function_type_matrix_test, max_char) {
test_function_with_type<kind::character>("max(", "CHAR(3)", "('AAA'),('BBB'),('CCC')", accessor::text{"CCC"});
}

TEST_F(sql_function_type_matrix_test, max_char_20) {
test_function_with_type<kind::character>("max(", "CHAR(20)", "('AAA'),('BBB'),('CCC')", accessor::text{"CCC "});
}

TEST_F(sql_function_type_matrix_test, max_varbinary) {
test_function_with_type<kind::octet>("max(", "VARBINARY(3)", "('010101'),('020202'),('030303')", accessor::binary{"\x03\x03\x03"});
}
Expand All @@ -87,6 +91,10 @@ TEST_F(sql_function_type_matrix_test, max_binary) {
test_function_with_type<kind::octet>("max(", "BINARY(3)", "('010101'),('020202'),('030303')", accessor::binary{"\x03\x03\x03"});
}

TEST_F(sql_function_type_matrix_test, max_binary_20) {
test_function_with_type<kind::octet>("max(", "BINARY(20)", "('010101'),('020202'),('030303')", accessor::binary{"\x03\x03\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"});
}

TEST_F(sql_function_type_matrix_test, max_date) {
test_function_with_type<kind::date>("max(", "DATE", "(DATE'2000-01-01'),(DATE'2000-01-02'),(DATE'2000-01-03')", date{2000, 1, 3});
}
Expand Down
8 changes: 8 additions & 0 deletions test/jogasaki/api/sql_function_type_matrix_min_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ TEST_F(sql_function_type_matrix_test, min_char) {
test_function_with_type<kind::character>("min(", "CHAR(3)", "('AAA'),('BBB'),('CCC')", accessor::text{"AAA"});
}

TEST_F(sql_function_type_matrix_test, min_char_20) {
test_function_with_type<kind::character>("min(", "CHAR(20)", "('AAA'),('BBB'),('CCC')", accessor::text{"AAA "});
}

TEST_F(sql_function_type_matrix_test, min_varbinary) {
test_function_with_type<kind::octet>("min(", "VARBINARY(3)", "('010101'),('020202'),('030303')", accessor::binary{"\x01\x01\x01"});
}
Expand All @@ -83,6 +87,10 @@ TEST_F(sql_function_type_matrix_test, min_binary) {
test_function_with_type<kind::octet>("min(", "BINARY(3)", "('010101'),('020202'),('030303')", accessor::binary{"\x01\x01\x01"});
}

TEST_F(sql_function_type_matrix_test, min_binary_20) {
test_function_with_type<kind::octet>("min(", "BINARY(20)", "('010101'),('020202'),('030303')", accessor::binary{"\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"});
}

TEST_F(sql_function_type_matrix_test, min_date) {
test_function_with_type<kind::date>("min(", "DATE", "(DATE'2000-01-01'),(DATE'2000-01-02'),(DATE'2000-01-03')", date{2000, 1, 1});
}
Expand Down

0 comments on commit 5a7c42a

Please sign in to comment.