Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added retry to replication commands #1699

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/replication/api_command_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,32 @@ AgentConn_t* CreateAgentBase ( const AgentDesc_t& tDesc, int64_t iTimeoutMs )
}

// wrapper of PerformRemoteTasks
bool PerformRemoteTasksWrap ( VectorAgentConn_t & dNodes, RequestBuilder_i & tReq, ReplyParser_i & tReply )
bool PerformRemoteTasksWrap ( VectorAgentConn_t & dNodes, RequestBuilder_i & tReq, ReplyParser_i & tReply, bool bRetry )
{
if ( dNodes.IsEmpty() )
return true;

int iQueryRetry = ( bRetry ? g_iAgentRetryCount : -1 );
int iNodes = dNodes.GetLength();
int iFinished = PerformRemoteTasks ( dNodes, &tReq, &tReply );
int iFinished = PerformRemoteTasks ( dNodes, &tReq, &tReply, iQueryRetry );

if ( iFinished!=iNodes )
sphLogDebugRpl ( "%d(%d) nodes finished well", iFinished, iNodes );
bool bOk = ( iFinished==iNodes );
if ( !bOk || TlsMsg::HasErr() )
sphLogDebugRpl ( "%d(%d) nodes finished well, tls msg: %s", iFinished, iNodes, TlsMsg::szError() );
if ( bOk && TlsMsg::HasErr() )
TlsMsg::ResetErr();

StringBuilder_c tTmp ( ";" );
for ( const AgentConn_t * pAgent : dNodes )
{
if ( !pAgent->m_sFailure.IsEmpty() )
{
sphWarning ( "'%s:%d': %s", pAgent->m_tDesc.m_sAddr.cstr(), pAgent->m_tDesc.m_iPort, pAgent->m_sFailure.cstr() );
tTmp.Appendf ( "'%s:%d': %s", pAgent->m_tDesc.m_sAddr.cstr(), pAgent->m_tDesc.m_iPort, pAgent->m_sFailure.cstr() );
if ( !bOk )
TlsMsg::Err().Appendf ( "'%s:%d': %s", pAgent->m_tDesc.m_sAddr.cstr(), pAgent->m_tDesc.m_iPort, pAgent->m_sFailure.cstr() );
}
}
if ( !tTmp.IsEmpty() )
TlsMsg::Err() << tTmp.cstr();

return iFinished==iNodes && !TlsMsg::HasErr();
return ( bOk && !TlsMsg::HasErr() );
}


Expand Down
4 changes: 2 additions & 2 deletions src/replication/api_command_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ struct CustomAgentData_T final: public DefaultQueryResult_t
AgentConn_t* CreateAgentBase ( const AgentDesc_t& tDesc, int64_t iTimeoutMs );

// set to true to see all proto exchanging in the log
constexpr bool VERBOSE_LOG = false;
constexpr bool VERBOSE_LOG = true;

template<E_CLUSTER CMD, typename REQUEST = ClusterRequest_t, typename REPLY = EmptyReply_t >
class ClusterCommand_T: public RequestBuilder_i, public ReplyParser_i
Expand Down Expand Up @@ -187,7 +187,7 @@ class ClusterCommand_T: public RequestBuilder_i, public ReplyParser_i
}
};

bool PerformRemoteTasksWrap ( VectorAgentConn_t & dNodes, RequestBuilder_i & tReq, ReplyParser_i & tReply );
bool PerformRemoteTasksWrap ( VectorAgentConn_t & dNodes, RequestBuilder_i & tReq, ReplyParser_i & tReply, bool bRetry );

// handle all API incoming.
void HandleAPICommandCluster ( ISphOutputBuffer& tOut, WORD uCommandVer, InputBuffer_c& tBuf, const char* szClient );
2 changes: 1 addition & 1 deletion src/replication/cluster_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ void SendClusterDeleteToNodes ( const VecTraits_T<CSphString>& dNodes, const CSp
ClusterRequest_t tData { sCluster };
ClusterDelete_c tReq;
auto dAgents = tReq.MakeAgents ( GetDescAPINodes ( dNodes, Resolve_e::QUICK ), GetQueryTimeoutForReplication(), tData );
PerformRemoteTasksWrap ( dAgents, tReq, tReq );
PerformRemoteTasksWrap ( dAgents, tReq, tReq, true );
}
2 changes: 1 addition & 1 deletion src/replication/cluster_file_reserve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,5 +321,5 @@ bool SendClusterFileReserve ( VecRefPtrs_t<AgentConn_t*>& dAgents )
return false;

ClusterFileReserve_c tReq;
return PerformRemoteTasksWrap ( dAgents, tReq, tReq );
return PerformRemoteTasksWrap ( dAgents, tReq, tReq, true );
}
2 changes: 1 addition & 1 deletion src/replication/cluster_get_nodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ StrVec_t RemoteClusterGetNodes ( VectorAgentConn_t & dAgents )
// submit initial jobs
CSphRefcountedPtr<RemoteAgentsObserver_i> tReporter ( GetObserver ());
ClusterGetNodes_c tReq;
ScheduleDistrJobs ( dAgents, &tReq, &tReq, tReporter );
ScheduleDistrJobs ( dAgents, &tReq, &tReq, tReporter, g_iAgentRetryCount );

bool bDone = false;
while (!bDone)
Expand Down
2 changes: 1 addition & 1 deletion src/replication/cluster_index_add_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ StringBuilder_c& operator<< ( StringBuilder_c& tOut, const ClusterIndexAddLocalR
{
tOut << (const ClusterRequest_t&)tReq;
tOut << "index:" << tReq.m_sIndex;
tOut << "eindex:" << (BYTE)tReq.m_eIndex;
tOut << "type:" << (BYTE)tReq.m_eIndex;
tOut << "SendFilesSuccess:" << tReq.m_bSendFilesSuccess;
return tOut;
}
Expand Down
2 changes: 1 addition & 1 deletion src/replication/cluster_synced.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ bool SendClusterSynced ( const VecAgentDesc_t& dDesc, const ClusterSyncedRequest
{
ClusterSynced_c tReq;
auto dNodes = tReq.MakeAgents ( dDesc, GetQueryTimeoutForReplication(), tRequest );
return PerformRemoteTasksWrap ( dNodes, tReq, tReq );
return PerformRemoteTasksWrap ( dNodes, tReq, tReq, true );
}

void ReceiveClusterSynced ( ISphOutputBuffer & tOut, InputBuffer_c & tBuf, CSphString& sCluster )
Expand Down
2 changes: 1 addition & 1 deletion src/replication/cluster_update_nodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ bool SendClusterUpdateNodes ( const CSphString& sCluster, NODES_E eNodes, const
return true;

ClusterUpdateNodes_c tReq;
return PerformRemoteTasksWrap ( dAgents, tReq, tReq );
return PerformRemoteTasksWrap ( dAgents, tReq, tReq, true );
}


Expand Down
4 changes: 2 additions & 2 deletions src/replication/replicate_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static bool ActivateIndexOnRemotes ( const CSphString& sCluster, const CSphStrin
sphLogDebugRpl ( "sent table '%s' %s to %d nodes with timeout %d.%03d sec", sIndex.cstr(), ( bSendOk ? "loading" : "rollback" ), dNodes.GetLength(), (int)( tmLongOpTimeout / 1000 ), (int)( tmLongOpTimeout % 1000 ) );

ClusterIndexAddLocal_c tReq;
if ( !PerformRemoteTasksWrap ( dNodes, tReq, tReq ) )
if ( !PerformRemoteTasksWrap ( dNodes, tReq, tReq, true ) )
return false;

sphLogDebugRpl ( "remote table '%s' %s", sIndex.cstr(), ( bSendOk ? "added" : "rolled-back" ) );
Expand Down Expand Up @@ -289,7 +289,7 @@ bool ReplicateDistIndexToNodes ( const CSphString & sCluster, const CSphString &

sphLogDebugRpl ( "sending table '%s' to %d nodes with timeout %d.%03d sec", sIndex.cstr(), dNodes.GetLength(), (int)( tmTimeout / 1000 ), (int)( tmTimeout % 1000 ) );

return PerformRemoteTasksWrap ( dNodes, tReq, tReq );
return PerformRemoteTasksWrap ( dNodes, tReq, tReq, true );
}

static bool AddDistIndex ( const DistIndexSendRequest_t & tCmd )
Expand Down
4 changes: 2 additions & 2 deletions src/searchdha.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2175,8 +2175,8 @@ static bool RunRemoteTask ( AgentConn_t * pConnection, RequestBuilder_i * pQuery
void AgentConn_t::GenericInit ( RequestBuilder_i * pQuery, ReplyParser_i * pParser,
Reporter_i * pReporter, int iQueryRetry, int iQueryDelay )
{
sphLogDebugA ( "%d GenericInit() pBuilder %p, parser %p, retries %d, delay %d, ref=%d",
m_iStoreTag, pQuery, pParser, iQueryRetry, iQueryDelay, ( int ) GetRefcount ());
sphLogDebugA ( "%d GenericInit() pBuilder %p, parser %p, retries %d(%d), delay %d(%d), ref=%d",
m_iStoreTag, pQuery, pParser, iQueryRetry, m_iRetries, iQueryDelay, m_iDelay, ( int ) GetRefcount ());
if ( iQueryDelay>=0 )
m_iDelay = iQueryDelay;

Expand Down