Skip to content

Commit

Permalink
Range Locking: Move locking secondary keys to right place
Browse files Browse the repository at this point in the history
From ha_rocksdb::update_write_sk into ha_rocksdb::check_and_lock_sk
  • Loading branch information
spetrunia committed Aug 23, 2021
1 parent a234810 commit 8c6c1b8
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 17 deletions.
70 changes: 53 additions & 17 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10995,6 +10995,8 @@ int ha_rocksdb::check_and_lock_sk(
lock for this key.
*/
if (!(key_info->flags & HA_NOSAME)) {
if (rocksdb_use_range_locking)
return check_and_lock_non_unique_sk(key_id, row_info);
return HA_EXIT_SUCCESS;
}

Expand Down Expand Up @@ -11111,6 +11113,57 @@ int ha_rocksdb::check_and_lock_sk(
return rc;
}


/**
@brief
Lock the non-unique sk for range locking
*/
int ha_rocksdb::check_and_lock_non_unique_sk(
const uint key_id, const struct update_row_info &row_info) {

DBUG_ASSERT(rocksdb_use_range_locking);
const Rdb_key_def &kd = *m_key_descr_arr[key_id];
bool store_row_debug_checksums = should_store_row_debug_checksums();

if (row_info.old_data != nullptr) {
rocksdb::Slice old_key_slice;
int old_packed_size;

old_packed_size = kd.pack_record(
table, m_pack_buffer, row_info.old_data, m_sk_packed_tuple_old,
&m_sk_tails_old, store_row_debug_checksums, row_info.hidden_pk_id, 0,
nullptr, m_ttl_bytes);

old_key_slice = rocksdb::Slice(
reinterpret_cast<const char *>(m_sk_packed_tuple_old), old_packed_size);

auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), old_key_slice);
if (!s.ok()) {
return (row_info.tx->set_status_error(table->in_use, s, kd,
m_tbl_def, m_table_handler));
}
}

int new_packed_size;
rocksdb::Slice new_key_slice;
rocksdb::Slice new_value_slice;
new_packed_size =
kd.pack_record(table, m_pack_buffer, row_info.new_data,
m_sk_packed_tuple, &m_sk_tails, 0,
row_info.hidden_pk_id, 0, nullptr, m_ttl_bytes);
new_key_slice = rocksdb::Slice(
reinterpret_cast<const char *>(m_sk_packed_tuple), new_packed_size);

auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), new_key_slice);
if (!s.ok()) {
return (row_info.tx->set_status_error(table->in_use, s, kd,
m_tbl_def, m_table_handler));
}

return HA_EXIT_SUCCESS;
}


/**
Enumerate all keys to check their uniquess and also lock it

Expand Down Expand Up @@ -11428,15 +11481,6 @@ int ha_rocksdb::update_write_sk(const TABLE *const table_arg,
old_key_slice = rocksdb::Slice(
reinterpret_cast<const char *>(m_sk_packed_tuple_old), old_packed_size);

/* Range locking: lock the index tuple being deleted */
if (rocksdb_use_range_locking) {
auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), old_key_slice);
if (!s.ok()) {
return (row_info.tx->set_status_error(table->in_use, s, kd,
m_tbl_def, m_table_handler));
}
}

// TODO(mung) - If the new_data and old_data below to the same partial index
// group (ie. have the same prefix), we can make use of the read below to
// determine whether to issue SingleDelete or not.
Expand Down Expand Up @@ -11482,14 +11526,6 @@ int ha_rocksdb::update_write_sk(const TABLE *const table_arg,
if (bulk_load_sk && row_info.old_data == nullptr) {
rc = bulk_load_key(row_info.tx, kd, new_key_slice, new_value_slice, true);
} else {
/* Range locking: lock the index tuple being inserted */
if (rocksdb_use_range_locking) {
auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), new_key_slice);
if (!s.ok()) {
return (row_info.tx->set_status_error(table->in_use, s, kd,
m_tbl_def, m_table_handler));
}
}
row_info.tx->get_indexed_write_batch()->Put(kd.get_cf(), new_key_slice,
new_value_slice);
}
Expand Down
4 changes: 4 additions & 0 deletions storage/rocksdb/ha_rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,10 @@ class ha_rocksdb : public my_core::handler {
const struct update_row_info &row_info,
bool *const found, const bool skip_unique_check)
MY_ATTRIBUTE((__warn_unused_result__));

int check_and_lock_non_unique_sk(const uint key_id,
const struct update_row_info &row_info)
MY_ATTRIBUTE((__warn_unused_result__));
int check_uniqueness_and_lock(const struct update_row_info &row_info,
bool pk_changed, const bool skip_unique_check)
MY_ATTRIBUTE((__warn_unused_result__));
Expand Down

0 comments on commit 8c6c1b8

Please sign in to comment.