Skip to content

Commit

Permalink
canonical_mutation: add to_mutation_gently
Browse files Browse the repository at this point in the history
to_mutation_gently generates mutation from canonical_mutation
asynchronously using the newly introduced mutation_partition
accept_gently method.

Signed-off-by: Benny Halevy <[email protected]>
  • Loading branch information
bhalevy committed May 2, 2024
1 parent 7f7e461 commit c485ed6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
28 changes: 28 additions & 0 deletions mutation/async_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,31 @@ future<> apply_gently(mutation& target, const mutation& m) {
mutation_application_stats app_stats;
co_await apply_gently(target.partition(), *target.schema(), std::move(m2.partition()), *m.schema(), app_stats);
}

future<mutation> to_mutation_gently(const canonical_mutation& cm, schema_ptr s) {
auto in = ser::as_input_stream(cm.representation());
auto mv = ser::deserialize(in, boost::type<ser::canonical_mutation_view>());

auto cf_id = mv.table_id();
if (s->id() != cf_id) {
throw std::runtime_error(format("Attempted to deserialize canonical_mutation of table {} with schema of table {} ({}.{})",
cf_id, s->id(), s->ks_name(), s->cf_name()));
}

auto version = mv.schema_version();
auto pk = mv.key();

mutation m(std::move(s), std::move(pk));

if (version == m.schema()->version()) {
auto partition_view = mutation_partition_view::from_view(mv.partition());
mutation_application_stats app_stats;
co_await apply_gently(m.partition(), *m.schema(), partition_view, *m.schema(), app_stats);
} else {
column_mapping cm = mv.mapping();
converting_mutation_partition_applier v(cm, *m.schema(), m.partition());
auto partition_view = mutation_partition_view::from_view(mv.partition());
co_await partition_view.accept_gently(cm, v);
}
co_return m;
}
3 changes: 3 additions & 0 deletions mutation/async_utils.hh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "mutation_partition.hh"
#include "mutation.hh"
#include "canonical_mutation.hh"

//
// Applies p to the `target` mutation_partition.
Expand All @@ -33,3 +34,5 @@ future<> apply_gently(mutation_partition& target, const schema& target_schema, m
// applying changes to memtable, which is done synchronously.
future<> apply_gently(mutation& target, mutation&& m);
future<> apply_gently(mutation& target, const mutation& m);

future<mutation> to_mutation_gently(const canonical_mutation& cm, schema_ptr s);
19 changes: 12 additions & 7 deletions test/boost/canonical_mutation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,25 @@
#include <boost/test/unit_test.hpp>

#include "mutation/canonical_mutation.hh"
#include "mutation/async_utils.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/mutation_assertions.hh"

#include "test/lib/scylla_test_case.hh"

#include <seastar/core/thread.hh>

SEASTAR_TEST_CASE(test_conversion_back_and_forth) {
return seastar::async([] {
for_each_mutation([] (const mutation& m) {
canonical_mutation cm(m);
assert_that(cm.to_mutation(m.schema())).is_equal_to(m);
});
});
SEASTAR_THREAD_TEST_CASE(test_conversion_back_and_forth) {
// FIXME: for (auto do_make_canonical_mutation_gently : {false, true}) {
for (auto do_make_mutation_gently : {false, true}) {
for_each_mutation([&] (const mutation& m) {
// FIXME: for now
canonical_mutation cm(m);
auto m2 = do_make_mutation_gently ? to_mutation_gently(cm, m.schema()).get() : cm.to_mutation(m.schema());
assert_that(m2).is_equal_to(m);
});
}
// FIXME }
}

SEASTAR_TEST_CASE(test_reading_with_different_schemas) {
Expand Down

0 comments on commit c485ed6

Please sign in to comment.