-
Notifications
You must be signed in to change notification settings - Fork 964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(search_family): Remove the output of extra fields in the FT.AGGREGATE command #4231
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,10 +11,10 @@ namespace dfly::aggregate { | |
namespace { | ||
|
||
struct GroupStep { | ||
PipelineResult operator()(std::vector<DocValues> values) { | ||
PipelineResult operator()(PipelineResult result) { | ||
// Separate items into groups | ||
absl::flat_hash_map<absl::FixedArray<Value>, std::vector<DocValues>> groups; | ||
for (auto& value : values) { | ||
for (auto& value : result.values) { | ||
groups[Extract(value)].push_back(std::move(value)); | ||
} | ||
|
||
|
@@ -28,7 +28,18 @@ struct GroupStep { | |
} | ||
out.push_back(std::move(doc)); | ||
} | ||
return out; | ||
|
||
absl::flat_hash_set<std::string> fields_to_print; | ||
fields_to_print.reserve(fields_.size() + reducers_.size()); | ||
|
||
for (auto& field : fields_) { | ||
fields_to_print.insert(std::move(field)); | ||
} | ||
for (auto& reducer : reducers_) { | ||
fields_to_print.insert(std::move(reducer.result_field)); | ||
} | ||
|
||
return {std::move(out), std::move(fields_to_print)}; | ||
} | ||
|
||
absl::FixedArray<Value> Extract(const DocValues& dv) { | ||
|
@@ -104,34 +115,42 @@ PipelineStep MakeGroupStep(absl::Span<const std::string_view> fields, | |
} | ||
|
||
PipelineStep MakeSortStep(std::string_view field, bool descending) { | ||
return [field = std::string(field), descending](std::vector<DocValues> values) -> PipelineResult { | ||
return [field = std::string(field), descending](PipelineResult result) -> PipelineResult { | ||
auto& values = result.values; | ||
|
||
std::sort(values.begin(), values.end(), [field](const DocValues& l, const DocValues& r) { | ||
auto it1 = l.find(field); | ||
auto it2 = r.find(field); | ||
return it1 == l.end() || (it2 != r.end() && it1->second < it2->second); | ||
}); | ||
if (descending) | ||
|
||
if (descending) { | ||
std::reverse(values.begin(), values.end()); | ||
return values; | ||
} | ||
|
||
result.fields_to_print.insert(field); | ||
return result; | ||
}; | ||
} | ||
|
||
PipelineStep MakeLimitStep(size_t offset, size_t num) { | ||
return [offset, num](std::vector<DocValues> values) -> PipelineResult { | ||
return [offset, num](PipelineResult result) { | ||
auto& values = result.values; | ||
values.erase(values.begin(), values.begin() + std::min(offset, values.size())); | ||
values.resize(std::min(num, values.size())); | ||
return values; | ||
return result; | ||
}; | ||
} | ||
|
||
PipelineResult Process(std::vector<DocValues> values, absl::Span<const PipelineStep> steps) { | ||
PipelineResult Process(std::vector<DocValues> values, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please put your prod-duty hat, and think about all the VLOG statements you would like to have, if we have problems in production with this code. |
||
absl::Span<const std::string_view> fields_to_print, | ||
absl::Span<const PipelineStep> steps) { | ||
PipelineResult result{std::move(values), {fields_to_print.begin(), fields_to_print.end()}}; | ||
for (auto& step : steps) { | ||
auto result = step(std::move(values)); | ||
if (!result.has_value()) | ||
return result; | ||
values = std::move(result.value()); | ||
PipelineResult step_result = step(std::move(result)); | ||
result = std::move(step_result); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer to see step method modifying the result instead of all these moves |
||
} | ||
return values; | ||
return result; | ||
} | ||
|
||
} // namespace dfly::aggregate |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
#pragma once | ||
|
||
#include <absl/container/flat_hash_map.h> | ||
#include <absl/container/flat_hash_set.h> | ||
#include <absl/types/span.h> | ||
|
||
#include <string> | ||
|
@@ -19,10 +20,16 @@ namespace dfly::aggregate { | |
using Value = ::dfly::search::SortableValue; | ||
using DocValues = absl::flat_hash_map<std::string, Value>; // documents sent through the pipeline | ||
|
||
// TODO: Replace DocValues with compact linear search map instead of hash map | ||
struct PipelineResult { | ||
// Values to be passed to the next step | ||
// TODO: Replace DocValues with compact linear search map instead of hash map | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this TODO relevant? I do not see a hash map here |
||
std::vector<DocValues> values; | ||
|
||
using PipelineResult = io::Result<std::vector<DocValues>, facade::ErrorReply>; | ||
using PipelineStep = std::function<PipelineResult(std::vector<DocValues>)>; // Group, Sort, etc. | ||
// Fields from values to be printed | ||
absl::flat_hash_set<std::string> fields_to_print; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why flat_hash_set |
||
}; | ||
|
||
using PipelineStep = std::function<PipelineResult(PipelineResult)>; // Group, Sort, etc. | ||
|
||
// Iterator over Span<DocValues> that yields doc[field] or monostate if not present. | ||
// Extra clumsy for STL compatibility! | ||
|
@@ -82,6 +89,8 @@ PipelineStep MakeSortStep(std::string_view field, bool descending = false); | |
PipelineStep MakeLimitStep(size_t offset, size_t num); | ||
|
||
// Process values with given steps | ||
PipelineResult Process(std::vector<DocValues> values, absl::Span<const PipelineStep> steps); | ||
PipelineResult Process(std::vector<DocValues> values, | ||
absl::Span<const std::string_view> fields_to_print, | ||
absl::Span<const PipelineStep> steps); | ||
|
||
} // namespace dfly::aggregate |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -980,22 +980,35 @@ void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilde | |
make_move_iterator(sub_results.end())); | ||
} | ||
|
||
auto agg_results = aggregate::Process(std::move(values), params->steps); | ||
if (!agg_results.has_value()) | ||
return builder->SendError(agg_results.error()); | ||
std::vector<std::string_view> load_fields; | ||
if (params->load_fields) { | ||
load_fields.reserve(params->load_fields->size()); | ||
for (const auto& field : params->load_fields.value()) { | ||
load_fields.push_back(field.GetShortName()); | ||
} | ||
} | ||
|
||
auto agg_results = aggregate::Process(std::move(values), load_fields, params->steps); | ||
|
||
size_t result_size = agg_results->size(); | ||
auto* rb = static_cast<RedisReplyBuilder*>(builder); | ||
auto sortable_value_sender = SortableValueSender(rb); | ||
|
||
const size_t result_size = agg_results.values.size(); | ||
rb->StartArray(result_size + 1); | ||
rb->SendLong(result_size); | ||
|
||
for (const auto& result : agg_results.value()) { | ||
rb->StartArray(result.size() * 2); | ||
for (const auto& [k, v] : result) { | ||
rb->SendBulkString(k); | ||
std::visit(sortable_value_sender, v); | ||
const size_t field_count = agg_results.fields_to_print.size(); | ||
for (const auto& value : agg_results.values) { | ||
rb->StartArray(field_count * 2); | ||
for (const auto& field : agg_results.fields_to_print) { | ||
rb->SendBulkString(field); | ||
|
||
auto it = value.find(field); | ||
if (it != value.end()) { | ||
Comment on lines
+1006
to
+1007
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: if (auto it = value.find(field); it != value.end()) |
||
std::visit(sortable_value_sender, it->second); | ||
} else { | ||
rb->SendNull(); | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of reverse, you can do correct sorting