Skip to content

Commit

Permalink
Merge pull request #216 from psychocrypt/topic-deferlock
Browse files Browse the repository at this point in the history
optimze queue lock handling
  • Loading branch information
fireice-uk authored Aug 23, 2019
2 parents 6a10723 + c6a45c2 commit d4178d7
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/common/gulps.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ class gulps

void output_main()
{
std::unique_lock<std::mutex> lck;
std::unique_lock<std::mutex> lck = msg_q.get_lock();
while(msg_q.wait_for_pop(lck))
{
message msg = msg_q.pop(lck);
Expand Down
9 changes: 7 additions & 2 deletions src/common/thdq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#pragma once
#pragma once

#include <queue>
#include <list>
Expand Down Expand Up @@ -69,9 +69,14 @@ class thdq
return false;
}

std::unique_lock<std::mutex> get_lock()
{
return std::unique_lock<std::mutex>(mutex_, std::defer_lock);
}

bool wait_for_pop(std::unique_lock<std::mutex>& lck)
{
lck = std::unique_lock<std::mutex>(mutex_);
lck.lock();
while (queue_.empty() && !finish) { cond_.wait(lck); }
bool has_pop = !queue_.empty();
if(!has_pop) { lck.unlock(); }
Expand Down
12 changes: 6 additions & 6 deletions src/wallet/wallet2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2013,18 +2013,18 @@ void wallet2::integrate_scanned_result(std::unique_ptr<wallet_rpc_scan_data>& re

// Do outgoing funds
bool needs_full_scan = false;
for(const auto& n : m_key_images)
for(const auto& n : m_key_images)
{
if(!res->key_images.not_present(&n.first, sizeof(crypto::key_image)))
{
needs_full_scan = true;
break;
}
}

if(!needs_full_scan)
{
for(const auto& n : res->incoming_kimg)
for(const auto& n : res->incoming_kimg)
{
if(!res->key_images.not_present(&n, sizeof(crypto::key_image)))
{
Expand Down Expand Up @@ -2075,7 +2075,7 @@ void wallet2::integrate_scanned_result(std::unique_ptr<wallet_rpc_scan_data>& re

for(auto& v : tx_calls)
calls.emplace_back(std::move(v.second));

std::sort(calls.begin(), calls.end(), [](const call_pair& a, const call_pair& b)
{ return a.second < b.second; });

Expand Down Expand Up @@ -2124,7 +2124,7 @@ void wallet2::refresh(uint64_t start_height, uint64_t &blocks_fetched, bool &rec

wallet_scan_ctx ct(*this, refresh_ctx);
refresh_ctx.m_running_scan_thd_cnt = thd_max;

GULPSF_LOG_L1("Running {} scanning threads", refresh_ctx.m_running_scan_thd_cnt);

std::vector<std::thread> scan_thds;
Expand All @@ -2134,7 +2134,7 @@ void wallet2::refresh(uint64_t start_height, uint64_t &blocks_fetched, bool &rec

size_t result_idx=0;
std::list<std::unique_ptr<wallet2::wallet_rpc_scan_data>> result_list;
std::unique_lock<std::mutex> lck;
std::unique_lock<std::mutex> lck = refresh_ctx.m_scan_out_queue.get_lock();
while(refresh_ctx.m_scan_out_queue.wait_for_pop(lck))
{
result_list.emplace_back(refresh_ctx.m_scan_out_queue.pop(lck));
Expand Down
16 changes: 8 additions & 8 deletions src/wallet/wallet2_tx_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void wallet2::block_download_thd(wallet2::wallet_block_dl_ctx& ctx)
{
GULPS_LOG_L1("No more blocks from daemon.");
ctx.refresh_ctx.m_scan_in_queue.set_finish_flag();
ctx.refreshed = true;
ctx.refreshed = true;
return;
}

Expand All @@ -116,7 +116,7 @@ void wallet2::block_download_thd(wallet2::wallet_block_dl_ctx& ctx)
ctx.start_height = 0;
first_round = false;
}

drop_from_short_history(ctx.short_chain_history, 3);
// prepend the last 3 blocks, should be enough to guard against a block or two's reorg
cryptonote::block bl;
Expand Down Expand Up @@ -198,7 +198,7 @@ bool wallet2::block_scan_tx(const wallet_scan_ctx& ctx, const crypto::hash& txid
#else
derivation_res = crypto::generate_key_derivation(tx_pub_key, keys.m_view_secret_key, derivation);
#endif

if(!derivation_res)
{
GULPS_WARN("Failed to generate key derivation from tx pubkey, skipping");
Expand Down Expand Up @@ -299,13 +299,13 @@ void wallet2::block_scan_thd(const wallet_scan_ctx& ctx)
{
try
{
std::unique_lock<std::mutex> lck;
std::unique_lock<std::mutex> lck = ctx.refresh_ctx.m_scan_in_queue.get_lock();
while(ctx.refresh_ctx.m_scan_in_queue.wait_for_pop(lck))
{
std::unique_ptr<wallet2::wallet_rpc_scan_data> pull_res = ctx.refresh_ctx.m_scan_in_queue.pop(lck);

GULPSF_LOG_L1("Scanning blocks {} - {}", pull_res->blocks_start_height, pull_res->blocks_start_height+pull_res->blocks_bin.size()-1);

if(ctx.refresh_ctx.m_scan_error)
{
GULPS_LOG_L1("block_scan_thd exits due to m_scan_error.");
Expand All @@ -327,16 +327,16 @@ void wallet2::block_scan_thd(const wallet_scan_ctx& ctx)
THROW_WALLET_EXCEPTION_IF(!r, error::block_parse_error, bl.block);
blke.block_hash = get_block_hash(blke.block);
THROW_WALLET_EXCEPTION_IF(bl.txs.size() + 1 != blk_o_idx.indices.size(), error::wallet_internal_error,
"block transactions=" + std::to_string(bl.txs.size()) + " not match with daemon response size=" +
"block transactions=" + std::to_string(bl.txs.size()) + " not match with daemon response size=" +
std::to_string(blk_o_idx.indices.size()) + " for block " + std::to_string(blke.block_height));
THROW_WALLET_EXCEPTION_IF(bl.txs.size() != blke.block.tx_hashes.size(), error::wallet_internal_error,
THROW_WALLET_EXCEPTION_IF(bl.txs.size() != blke.block.tx_hashes.size(), error::wallet_internal_error,
"Wrong amount of transactions for block");

blk_i++;
if(!ctx.explicit_refresh && (blke.block.timestamp + 60 * 60 * 24 <= ctx.wallet_create_time || blke.block_height < ctx.refresh_height))
{
if(blke.block_height % 100 == 0)
GULPS_LOG_L2("Skipped block by timestamp, height: ", blke.block_height, ", block time ",
GULPS_LOG_L2("Skipped block by timestamp, height: ", blke.block_height, ", block time ",
blke.block.timestamp, ", account time ", ctx.wallet_create_time);
blke.skipped = true;
continue;
Expand Down

0 comments on commit d4178d7

Please sign in to comment.