From 8c6c1b80a0b08b896de495272a41c33fa5dd406e Mon Sep 17 00:00:00 2001 From: Sergei Petrunia Date: Mon, 23 Aug 2021 22:03:40 +0300 Subject: [PATCH] Range Locking: Move locking secondary keys to right place From ha_rocksdb::update_write_sk into ha_rocksdb::check_and_lock_sk --- storage/rocksdb/ha_rocksdb.cc | 70 ++++++++++++++++++++++++++--------- storage/rocksdb/ha_rocksdb.h | 4 ++ 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index b685bc11eb52..4bda73ff3d29 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -10995,6 +10995,8 @@ int ha_rocksdb::check_and_lock_sk( lock for this key. */ if (!(key_info->flags & HA_NOSAME)) { + if (rocksdb_use_range_locking) + return check_and_lock_non_unique_sk(key_id, row_info); return HA_EXIT_SUCCESS; } @@ -11111,6 +11113,57 @@ int ha_rocksdb::check_and_lock_sk( return rc; } + +/** + @brief + Lock the non-unique sk for range locking +*/ +int ha_rocksdb::check_and_lock_non_unique_sk( + const uint key_id, const struct update_row_info &row_info) { + + DBUG_ASSERT(rocksdb_use_range_locking); + const Rdb_key_def &kd = *m_key_descr_arr[key_id]; + bool store_row_debug_checksums = should_store_row_debug_checksums(); + + if (row_info.old_data != nullptr) { + rocksdb::Slice old_key_slice; + int old_packed_size; + + old_packed_size = kd.pack_record( + table, m_pack_buffer, row_info.old_data, m_sk_packed_tuple_old, + &m_sk_tails_old, store_row_debug_checksums, row_info.hidden_pk_id, 0, + nullptr, m_ttl_bytes); + + old_key_slice = rocksdb::Slice( + reinterpret_cast(m_sk_packed_tuple_old), old_packed_size); + + auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), old_key_slice); + if (!s.ok()) { + return (row_info.tx->set_status_error(table->in_use, s, kd, + m_tbl_def, m_table_handler)); + } + } + + int new_packed_size; + rocksdb::Slice new_key_slice; + rocksdb::Slice new_value_slice; + new_packed_size = + kd.pack_record(table, m_pack_buffer, row_info.new_data, + m_sk_packed_tuple, &m_sk_tails, 0, + row_info.hidden_pk_id, 0, nullptr, m_ttl_bytes); + new_key_slice = rocksdb::Slice( + reinterpret_cast(m_sk_packed_tuple), new_packed_size); + + auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), new_key_slice); + if (!s.ok()) { + return (row_info.tx->set_status_error(table->in_use, s, kd, + m_tbl_def, m_table_handler)); + } + + return HA_EXIT_SUCCESS; +} + + /** Enumerate all keys to check their uniquess and also lock it @@ -11428,15 +11481,6 @@ int ha_rocksdb::update_write_sk(const TABLE *const table_arg, old_key_slice = rocksdb::Slice( reinterpret_cast(m_sk_packed_tuple_old), old_packed_size); - /* Range locking: lock the index tuple being deleted */ - if (rocksdb_use_range_locking) { - auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), old_key_slice); - if (!s.ok()) { - return (row_info.tx->set_status_error(table->in_use, s, kd, - m_tbl_def, m_table_handler)); - } - } - // TODO(mung) - If the new_data and old_data below to the same partial index // group (ie. have the same prefix), we can make use of the read below to // determine whether to issue SingleDelete or not. @@ -11482,14 +11526,6 @@ int ha_rocksdb::update_write_sk(const TABLE *const table_arg, if (bulk_load_sk && row_info.old_data == nullptr) { rc = bulk_load_key(row_info.tx, kd, new_key_slice, new_value_slice, true); } else { - /* Range locking: lock the index tuple being inserted */ - if (rocksdb_use_range_locking) { - auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), new_key_slice); - if (!s.ok()) { - return (row_info.tx->set_status_error(table->in_use, s, kd, - m_tbl_def, m_table_handler)); - } - } row_info.tx->get_indexed_write_batch()->Put(kd.get_cf(), new_key_slice, new_value_slice); } diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index ebe14103de19..48ba50efd3b4 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -758,6 +758,10 @@ class ha_rocksdb : public my_core::handler { const struct update_row_info &row_info, bool *const found, const bool skip_unique_check) MY_ATTRIBUTE((__warn_unused_result__)); + + int check_and_lock_non_unique_sk(const uint key_id, + const struct update_row_info &row_info) + MY_ATTRIBUTE((__warn_unused_result__)); int check_uniqueness_and_lock(const struct update_row_info &row_info, bool pk_changed, const bool skip_unique_check) MY_ATTRIBUTE((__warn_unused_result__));