Skip to content

Commit

Permalink
add more log for easy debug & track connection state
Browse files Browse the repository at this point in the history
  • Loading branch information
MOON-CLJ committed Jan 27, 2019
1 parent b69f385 commit 3850172
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 14 deletions.
10 changes: 10 additions & 0 deletions src/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ err_code_t Client::get(const char* const* keys, const size_t* keyLens, size_t nK
dispatchRetrieval(GET_OP, keys, keyLens, nKeys);
err_code_t rv = waitPoll();
collectRetrievalResult(results, nResults);
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv));
return rv;
}

Expand All @@ -50,6 +51,7 @@ err_code_t Client::gets(const char* const* keys, const size_t* keyLens, size_t n
dispatchRetrieval(GETS_OP, keys, keyLens, nKeys);
err_code_t rv = waitPoll();
collectRetrievalResult(results, nResults);
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv));
return rv;
}

Expand Down Expand Up @@ -101,6 +103,7 @@ err_code_t Client::M(const char* const* keys, const size_t* keyLens, \
valLens, nItems); \
err_code_t rv = waitPoll(); \
collectMessageResult(results, nResults); \
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv)); \
return rv;\
}

Expand All @@ -118,6 +121,7 @@ err_code_t Client::_delete(const char* const* keys, const size_t* keyLens,
dispatchDeletion(keys, keyLens, noreply, nItems);
err_code_t rv = waitPoll();
collectMessageResult(results, nResults);
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv));
return rv;
}

Expand All @@ -144,6 +148,7 @@ err_code_t Client::version(broadcast_result_t** results, size_t* nHosts) {
broadcastCommand(keywords::kVERSION, 7);
err_code_t rv = waitPoll();
collectBroadcastResult(results, nHosts);
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv));
return rv;
}

Expand All @@ -152,6 +157,7 @@ err_code_t Client::quit() {
broadcastCommand(keywords::kQUIT, 4, true);
err_code_t rv = waitPoll();
markDeadAll(NULL, keywords::kCONN_QUIT);
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv));
return rv;
}

Expand All @@ -160,6 +166,7 @@ err_code_t Client::stats(broadcast_result_t** results, size_t* nHosts) {
broadcastCommand(keywords::kSTATS, 5);
err_code_t rv = waitPoll();
collectBroadcastResult(results, nHosts);
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv));
return rv;
}

Expand All @@ -170,6 +177,7 @@ err_code_t Client::touch(const char* const* keys, const size_t* keyLens,
dispatchTouch(keys, keyLens, exptime, noreply, nItems);
err_code_t rv = waitPoll();
collectMessageResult(results, nResults);
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv));
return rv;
}

Expand All @@ -193,6 +201,7 @@ err_code_t Client::incr(const char* key, const size_t keyLen, const uint64_t del
dispatchIncrDecr(INCR_OP, key, keyLen, delta, noreply);
err_code_t rv = waitPoll();
collectUnsignedResult(results, nResults);
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv));
return rv;
}

Expand All @@ -203,6 +212,7 @@ err_code_t Client::decr(const char* key, const size_t keyLen, const uint64_t del
dispatchIncrDecr(DECR_OP, key, keyLen, delta, noreply);
err_code_t rv = waitPoll();
collectUnsignedResult(results, nResults);
log_info_if(rv != RET_OK, "[I: %p], err code %s", this, errCodeToString(rv));
return rv;
}

Expand Down
21 changes: 11 additions & 10 deletions src/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,15 @@ void Connection::close() {

bool Connection::tryReconnect() {
if (!m_alive) {
time_t now;
time(&now);
time_t now = time(NULL);
if (now >= m_deadUntil) {
int rv = this->connect();
if (rv == 0) {
if (m_deadUntil > 0) {
log_info("Connection %s is back to live at %lu", m_name, now);
}
log_info_if(m_deadUntil > 0, "[I: %p] %s is back to live at %lu", this, m_name, now);
m_deadUntil = 0;
} else {
m_deadUntil = now + m_retryTimeout;
// log_info("%s is still dead", m_name);
log_info("[I: %p] %s is still dead", this, m_name);
}
}
}
Expand All @@ -188,14 +185,16 @@ void Connection::markDead(const char* reason, int delay) {
m_deadUntil += delay; // check after `delay` seconds, default 0
this->close();
if (strcmp(reason, keywords::kCONN_QUIT) != 0) {
log_warn("Connection %s is dead(reason: %s, delay: %d), next check at %lu",
m_name, reason, delay, m_deadUntil);
log_warn("[I: %p] %s is dead(reason: %s, delay: %d), next check at %lu",
this, m_name, reason, delay, m_deadUntil);
#ifndef NDEBUG
std::queue<struct iovec>* q = m_parser.getRequestKeys();
if (!q->empty()) {
log_warn("%s: first request key: %.*s", m_name,
log_debug("[I: %p] %s first request key: %.*s", this, m_name,
static_cast<int>(q->front().iov_len),
static_cast<char*>(q->front().iov_base));
}
#endif
}
}
}
Expand All @@ -209,6 +208,7 @@ void Connection::takeBuffer(const char* const buf, size_t buf_len) {
}

void Connection::addRequestKey(const char* const key, const size_t len) {
log_debug("[I: %p] %s add request key: %.*s", this, m_name, static_cast<int>(len), key);
m_parser.addRequestKey(key, len);
}

Expand Down Expand Up @@ -261,7 +261,8 @@ ssize_t Connection::recv() {
size_t bufferSizeAvailable = m_buffer_reader->prepareWriteBlock(bufferSize);
char* writePtr = m_buffer_reader->getWritePtr();
ssize_t bufferSizeActual = ::recv(m_socketFd, writePtr, bufferSizeAvailable, 0);
// log_info("%p recv(%lu) %.*s", this, bufferSizeActual, (int)bufferSizeActual, writePtr);
log_debug("[I: %p] %s recv(%lu)", this, m_name, bufferSizeActual);
// log_debug("[I: %p] %.*s", (int)bufferSizeActual, writePtr);
if (bufferSizeActual > 0) {
m_buffer_reader->commitWrite(bufferSizeActual);
}
Expand Down
8 changes: 6 additions & 2 deletions src/ConnectionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ void ConnectionPool::dispatchStorage(op_code_t op,
conn->getMessageResults()->reserve(conn->m_counter);
}
}
log_debug("[I: %p] after dispatchStorage, m_nActiveConn: %d", this, m_nActiveConn);
}


Expand Down Expand Up @@ -248,7 +249,7 @@ void ConnectionPool::dispatchRetrieval(op_code_t op, const char* const* keys,
conn->getRetrievalResults()->reserve(conn->m_counter);
}
}
// log_debug("after dispatchRetrieval: m_nActiveConn: %d", this->m_nActiveConn);
log_debug("[I: %p] after dispatchRetrieval, m_nActiveConn: %d", this, m_nActiveConn);
}


Expand Down Expand Up @@ -404,7 +405,7 @@ err_code_t ConnectionPool::waitPoll() {
if (m_nInvalidKey > 0) {
return RET_INVALID_KEY_ERR;
} else {
// hard server error
log_debug("[I: %p] hard server error", this);
return RET_MC_SERVER_ERR;
}
}
Expand Down Expand Up @@ -480,6 +481,7 @@ err_code_t ConnectionPool::waitPoll() {
// POLLIN recv
ssize_t nRecv = conn->recv();
if (nRecv == -1 || nRecv == 0) {
log_warn("[I: %p] recv_error, Connection(%p): %s, nRecv: %zd", this, conn, conn->name(), nRecv);
markDeadConn(conn, keywords::kRECV_ERROR, pollfd_ptr);
ret_code = RET_RECV_ERR;
--m_nActiveConn;
Expand Down Expand Up @@ -625,6 +627,7 @@ void ConnectionPool::setRetryTimeout(int timeout) {


void ConnectionPool::markDeadAll(pollfd_t* pollfds, const char* reason) {
log_warn("[I: %p] markDeadAll(reason: %s), m_nActiveConn: %d", this, reason, m_nActiveConn);
nfds_t fd_idx = 0;
for (std::vector<Connection*>::iterator it = m_activeConns.begin();
it != m_activeConns.end();
Expand All @@ -643,6 +646,7 @@ void ConnectionPool::markDeadAll(pollfd_t* pollfds, const char* reason) {


void ConnectionPool::markDeadConn(Connection* conn, const char* reason, pollfd_t* fd_ptr) {
log_warn("[I: %p] markDeadConn(reason: %s), Connection(%p): %s", this, reason, conn, conn->name());
conn->markDead(reason);
fd_ptr->events &= ~POLLOUT & ~POLLIN;
fd_ptr->fd = conn->socketFd();
Expand Down
2 changes: 1 addition & 1 deletion src/HashkitKetama.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ std::vector<continuum_item_t>::iterator KetamaSelector::getServerIt(const char*
}
} while (--max_iter);
if (max_iter == 0) {
log_warn("no server is avaliable(alive) for key: \"%.*s\"", static_cast<int>(key_len), key);
log_warn("no server is available(alive) for key: \"%.*s\"", static_cast<int>(key_len), key);
return m_continuum.end();
}
} else {
Expand Down
1 change: 0 additions & 1 deletion src/Parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ void PacketParser::setBufferReader(BufferReader* reader) {


void PacketParser::addRequestKey(const char* const key, const size_t len) {
// log_info("add request key: %.*s", static_cast<int>(len), key);
struct iovec iov = {const_cast<char*>(key), len};
m_requestKeys.push(iov);
}
Expand Down

0 comments on commit 3850172

Please sign in to comment.