Skip to content

Commit

Permalink
Merge pull request #156 from yma96/main
Browse files Browse the repository at this point in the history
Reconnect Cassandra client and reinit session when NoHostAvailableException
  • Loading branch information
yma96 authored Oct 24, 2024
2 parents ba4f2d0 + 5926722 commit 3e5e67a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -132,7 +133,7 @@ public void recordLog( final StoreKey storeKey, final String ops, final String c
public List<DtxRepoOpsAuditRecord> getAuditLogByRepo( final String key, final int limit )
{
BoundStatement bound = preparedStoreAuditQueryByRepo.bind( key, limit );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );

List<DtxRepoOpsAuditRecord> records = new ArrayList<>();
result.forEach( row -> records.add( toDtxRepoOpsAuditRecord( row ) ) );
Expand All @@ -143,7 +144,7 @@ public List<DtxRepoOpsAuditRecord> getAuditLogByRepo( final String key, final in
public List<DtxRepoOpsAuditRecord> getAuditLogByRepoAndOps( final String key, final String ops, final int limit )
{
BoundStatement bound = preparedStoreAuditQueryByRepoAndOps.bind( key, ops, limit );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );

List<DtxRepoOpsAuditRecord> records = new ArrayList<>();
result.forEach( row -> records.add( toDtxRepoOpsAuditRecord( row ) ) );
Expand Down Expand Up @@ -187,4 +188,36 @@ private DtxRepoOpsAuditRecord toDtxRepoOpsAuditRecord( final Row row )
record.setChangeContent( row.getString( "changecontent" ) );
return record;
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
client.close();
client.init();
this.init();
}
trackingRecord = session.execute( bind );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
client.close();
client.init();
this.init();
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,15 @@ public Session getSession( String keyspace )
} );
}

private volatile boolean closed;

public void close()
{
if ( !closed && cluster != null )
if ( cluster != null )
{
logger.info( "Close cassandra client" );
sessions.forEach( ( key, value ) -> value.close() );
sessions.clear();
cluster.close();
cluster = null;
closed = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import org.commonjava.indy.service.repository.model.StoreKey;
Expand Down Expand Up @@ -138,7 +139,7 @@ public DtxArtifactStore getArtifactStore( String packageType, StoreType type, St
BoundStatement bound =
preparedSingleArtifactStoreQuery.bind( CassandraStoreUtil.getTypeKey( packageType, type.name() ),
CassandraStoreUtil.getHashPrefix( name ), name );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return toDtxArtifactStore( result.one() );
}

Expand All @@ -147,7 +148,7 @@ public Set<DtxArtifactStore> getArtifactStoresByPkgAndType( String packageType,

BoundStatement bound =
preparedArtifactStoresQueryByKeys.bind( CassandraStoreUtil.getTypeKey( packageType, type.name() ) );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );

Set<DtxArtifactStore> dtxArtifactStoreSet = new HashSet<>();
result.forEach( row -> dtxArtifactStoreSet.add( toDtxArtifactStore( row ) ) );
Expand All @@ -159,7 +160,7 @@ public Set<DtxArtifactStore> getAllArtifactStores()
{

BoundStatement bound = preparedArtifactStoresQuery.bind();
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );

Set<DtxArtifactStore> dtxArtifactStoreSet = new HashSet<>();
result.forEach( row -> dtxArtifactStoreSet.add( toDtxArtifactStore( row ) ) );
Expand All @@ -170,7 +171,7 @@ public Set<DtxArtifactStore> getAllArtifactStores()
public Boolean isEmpty()
{
BoundStatement bound = preparedArtifactStoresQuery.bind();
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return result.one() == null;
}

Expand All @@ -182,7 +183,7 @@ public DtxArtifactStore removeArtifactStore( String packageType, StoreType type,
BoundStatement bound =
preparedArtifactStoreDel.bind( CassandraStoreUtil.getTypeKey( packageType, type.name() ),
CassandraStoreUtil.getHashPrefix( name ), name );
session.execute( bound );
executeSession( bound );
}
return dtxArtifactStore;
}
Expand Down Expand Up @@ -220,7 +221,7 @@ public void createDtxArtifactStore( DtxArtifactStore dtxArtifactStore )
public DtxAffectedStore getAffectedStore( StoreKey key )
{
BoundStatement bound = preparedAffectedStoresQuery.bind( key.toString() );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return toDtxAffectedStore( result.one() );
}

Expand All @@ -246,7 +247,7 @@ public void addAffectedBy( StoreKey storeKey, StoreKey affected )
increment.add( affected.toString() );
bound.setSet( 0, increment );
bound.setString( 1, storeKey.toString() );
session.execute( bound );
executeSession( bound );
}

public void removeAffectedBy( StoreKey storeKey, StoreKey affected )
Expand All @@ -257,13 +258,13 @@ public void removeAffectedBy( StoreKey storeKey, StoreKey affected )
reduction.add( affected.toString() );
bound.setSet( 0, reduction );
bound.setString( 1, storeKey.toString() );
session.execute( bound );
executeSession( bound );
}

public Boolean isAffectedEmpty()
{
BoundStatement bound = preparedAffectedStoreExistedQuery.bind();
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return result.one() == null;
}

Expand All @@ -273,7 +274,39 @@ public void removeAffectedStore( StoreKey key )
if ( affectedStore != null )
{
BoundStatement bound = preparedAffectedStoreDel.bind( key.toString() );
session.execute( bound );
executeSession( bound );
}
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
client.close();
client.init();
this.init();
}
trackingRecord = session.execute( bind );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
client.close();
client.init();
this.init();
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
}

0 comments on commit 3e5e67a

Please sign in to comment.