Skip to content

Commit

Permalink
Merge remote-tracking branch 'pika/OpenAtom-S3' into feat/s3
Browse files Browse the repository at this point in the history
  • Loading branch information
longfar-ncy committed Sep 26, 2023
2 parents 9cd3390 + d06ec36 commit c236acf
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 2 deletions.
1 change: 1 addition & 0 deletions include/pika_client_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class PikaClientProcessor {
void SchedulePool(net::TaskFunc func, void* arg);
void ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
size_t ThreadPoolCurQueueSize();
size_t ThreadPoolMaxQueueSize();

private:
std::unique_ptr<net::ThreadPool> pool_;
Expand Down
4 changes: 3 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ class PikaServer : public pstd::noncopyable {
void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
size_t ClientProcessorThreadPoolMaxQueueSize();

/*
* BGSave used
Expand Down Expand Up @@ -518,7 +519,8 @@ class PikaServer : public pstd::noncopyable {
void AutoDeleteExpiredDump();
void AutoKeepAliveRSync();
void AutoUpdateNetworkMetric();

void PrintThreadPoolQueueStatus();

std::string host_;
int port_ = 0;
time_t start_time_s_ = 0;
Expand Down
8 changes: 8 additions & 0 deletions src/pika_client_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,11 @@ size_t PikaClientProcessor::ThreadPoolCurQueueSize() {
}
return cur_size;
}

size_t PikaClientProcessor::ThreadPoolMaxQueueSize() {
size_t cur_size = 0;
if (pool_) {
cur_size = pool_->max_queue_size();
}
return cur_size;
}
21 changes: 21 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
extern std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;
extern std::unique_ptr<net::NetworkStatistic> g_network_statistic;
// QUEUE_SIZE_THRESHOLD_PERCENTAGE is used to represent a percentage value and should be within the range of 0 to 100.
const size_t QUEUE_SIZE_THRESHOLD_PERCENTAGE = 75;

void DoPurgeDir(void* arg) {
std::unique_ptr<std::string> path(static_cast<std::string*>(arg));
Expand Down Expand Up @@ -854,6 +856,13 @@ size_t PikaServer::ClientProcessorThreadPoolCurQueueSize() {
return pika_client_processor_->ThreadPoolCurQueueSize();
}

size_t PikaServer::ClientProcessorThreadPoolMaxQueueSize() {
if (!pika_client_processor_) {
return 0;
}
return pika_client_processor_->ThreadPoolMaxQueueSize();
}

void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) {
bgsave_thread_.StartThread();
bgsave_thread_.Schedule(func, arg);
Expand Down Expand Up @@ -1298,6 +1307,8 @@ void PikaServer::DoTimingTask() {
ResetLastSecQuerynum();
// Auto update network instantaneous metric
AutoUpdateNetworkMetric();
// Print the queue status periodically
PrintThreadPoolQueueStatus();
}

void PikaServer::AutoCompactRange() {
Expand Down Expand Up @@ -1491,6 +1502,16 @@ void PikaServer::AutoUpdateNetworkMetric() {
current_time, factor);
}

void PikaServer::PrintThreadPoolQueueStatus() {
// Print the current queue size if it exceeds QUEUE_SIZE_THRESHOLD_PERCENTAGE/100 of the maximum queue size.
size_t cur_size = ClientProcessorThreadPoolCurQueueSize();
size_t max_size = ClientProcessorThreadPoolMaxQueueSize();
size_t thread_hold = (max_size / 100) * QUEUE_SIZE_THRESHOLD_PERCENTAGE;
if (cur_size > thread_hold) {
LOG(INFO) << "The current queue size of the Pika Server's client thread processor thread pool: " << cur_size;
}
}

void PikaServer::InitStorageOptions() {
std::lock_guard rwl(storage_options_rw_);

Expand Down
5 changes: 4 additions & 1 deletion src/pstd/src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ bool DeleteDirIfExist(const std::string& path) {
uint64_t Du(const std::string& path) {
uint64_t sum = 0;
try {
if (!filesystem::exists(path)) {
return 0;
}
if (filesystem::is_symlink(path)) {
filesystem::path symlink_path = filesystem::read_symlink(path);
sum = Du(symlink_path);
Expand All @@ -205,7 +208,7 @@ uint64_t Du(const std::string& path) {
sum = filesystem::file_size(path);
}
} catch (const filesystem::filesystem_error& ex) {
LOG(WARNING) << "Error accessing path: " << ex.what() << std::endl;
LOG(WARNING) << "Error accessing path: " << ex.what();
}

return sum;
Expand Down

0 comments on commit c236acf

Please sign in to comment.