From 592672244598580c916b69bc8c93c36b89e27acd Mon Sep 17 00:00:00 2001 From: yma Date: Wed, 23 Oct 2024 19:49:54 +0800 Subject: [PATCH] Reconnect Cassandra client and reinit session when NoHostAvailableException --- .../change/audit/StoreAuditManager.java | 37 ++++++++++++- .../data/cassandra/CassandraClient.java | 5 +- .../data/cassandra/CassandraStoreQuery.java | 53 +++++++++++++++---- 3 files changed, 79 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/commonjava/indy/service/repository/change/audit/StoreAuditManager.java b/src/main/java/org/commonjava/indy/service/repository/change/audit/StoreAuditManager.java index 53dff5d..281202b 100644 --- a/src/main/java/org/commonjava/indy/service/repository/change/audit/StoreAuditManager.java +++ b/src/main/java/org/commonjava/indy/service/repository/change/audit/StoreAuditManager.java @@ -20,6 +20,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import org.apache.commons.lang3.StringUtils; @@ -132,7 +133,7 @@ public void recordLog( final StoreKey storeKey, final String ops, final String c public List getAuditLogByRepo( final String key, final int limit ) { BoundStatement bound = preparedStoreAuditQueryByRepo.bind( key, limit ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); List records = new ArrayList<>(); result.forEach( row -> records.add( toDtxRepoOpsAuditRecord( row ) ) ); @@ -143,7 +144,7 @@ public List getAuditLogByRepo( final String key, final in public List getAuditLogByRepoAndOps( final String key, final String ops, final int limit ) { BoundStatement bound = preparedStoreAuditQueryByRepoAndOps.bind( key, ops, limit ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); List records = new ArrayList<>(); result.forEach( row -> records.add( toDtxRepoOpsAuditRecord( row ) ) ); @@ -187,4 +188,36 @@ private DtxRepoOpsAuditRecord toDtxRepoOpsAuditRecord( final Row row ) record.setChangeContent( row.getString( "changecontent" ) ); return record; } + + private ResultSet executeSession ( BoundStatement bind ) + { + boolean exception = false; + ResultSet trackingRecord = null; + try + { + if ( session == null || session.isClosed() ) + { + client.close(); + client.init(); + this.init(); + } + trackingRecord = session.execute( bind ); + } + catch ( NoHostAvailableException e ) + { + exception = true; + logger.error( "Cannot connect to host, reconnect once more with new session.", e ); + } + finally + { + if ( exception ) + { + client.close(); + client.init(); + this.init(); + trackingRecord = session.execute( bind ); + } + } + return trackingRecord; + } } diff --git a/src/main/java/org/commonjava/indy/service/repository/data/cassandra/CassandraClient.java b/src/main/java/org/commonjava/indy/service/repository/data/cassandra/CassandraClient.java index 3944ed9..d59c6f5 100644 --- a/src/main/java/org/commonjava/indy/service/repository/data/cassandra/CassandraClient.java +++ b/src/main/java/org/commonjava/indy/service/repository/data/cassandra/CassandraClient.java @@ -116,18 +116,15 @@ public Session getSession( String keyspace ) } ); } - private volatile boolean closed; - public void close() { - if ( !closed && cluster != null ) + if ( cluster != null ) { logger.info( "Close cassandra client" ); sessions.forEach( ( key, value ) -> value.close() ); sessions.clear(); cluster.close(); cluster = null; - closed = true; } } diff --git a/src/main/java/org/commonjava/indy/service/repository/data/cassandra/CassandraStoreQuery.java b/src/main/java/org/commonjava/indy/service/repository/data/cassandra/CassandraStoreQuery.java index 4100c33..362992b 100644 --- a/src/main/java/org/commonjava/indy/service/repository/data/cassandra/CassandraStoreQuery.java +++ b/src/main/java/org/commonjava/indy/service/repository/data/cassandra/CassandraStoreQuery.java @@ -20,6 +20,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import org.commonjava.indy.service.repository.model.StoreKey; @@ -138,7 +139,7 @@ public DtxArtifactStore getArtifactStore( String packageType, StoreType type, St BoundStatement bound = preparedSingleArtifactStoreQuery.bind( CassandraStoreUtil.getTypeKey( packageType, type.name() ), CassandraStoreUtil.getHashPrefix( name ), name ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); return toDtxArtifactStore( result.one() ); } @@ -147,7 +148,7 @@ public Set getArtifactStoresByPkgAndType( String packageType, BoundStatement bound = preparedArtifactStoresQueryByKeys.bind( CassandraStoreUtil.getTypeKey( packageType, type.name() ) ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); Set dtxArtifactStoreSet = new HashSet<>(); result.forEach( row -> dtxArtifactStoreSet.add( toDtxArtifactStore( row ) ) ); @@ -159,7 +160,7 @@ public Set getAllArtifactStores() { BoundStatement bound = preparedArtifactStoresQuery.bind(); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); Set dtxArtifactStoreSet = new HashSet<>(); result.forEach( row -> dtxArtifactStoreSet.add( toDtxArtifactStore( row ) ) ); @@ -170,7 +171,7 @@ public Set getAllArtifactStores() public Boolean isEmpty() { BoundStatement bound = preparedArtifactStoresQuery.bind(); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); return result.one() == null; } @@ -182,7 +183,7 @@ public DtxArtifactStore removeArtifactStore( String packageType, StoreType type, BoundStatement bound = preparedArtifactStoreDel.bind( CassandraStoreUtil.getTypeKey( packageType, type.name() ), CassandraStoreUtil.getHashPrefix( name ), name ); - session.execute( bound ); + executeSession( bound ); } return dtxArtifactStore; } @@ -220,7 +221,7 @@ public void createDtxArtifactStore( DtxArtifactStore dtxArtifactStore ) public DtxAffectedStore getAffectedStore( StoreKey key ) { BoundStatement bound = preparedAffectedStoresQuery.bind( key.toString() ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); return toDtxAffectedStore( result.one() ); } @@ -246,7 +247,7 @@ public void addAffectedBy( StoreKey storeKey, StoreKey affected ) increment.add( affected.toString() ); bound.setSet( 0, increment ); bound.setString( 1, storeKey.toString() ); - session.execute( bound ); + executeSession( bound ); } public void removeAffectedBy( StoreKey storeKey, StoreKey affected ) @@ -257,13 +258,13 @@ public void removeAffectedBy( StoreKey storeKey, StoreKey affected ) reduction.add( affected.toString() ); bound.setSet( 0, reduction ); bound.setString( 1, storeKey.toString() ); - session.execute( bound ); + executeSession( bound ); } public Boolean isAffectedEmpty() { BoundStatement bound = preparedAffectedStoreExistedQuery.bind(); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); return result.one() == null; } @@ -273,7 +274,39 @@ public void removeAffectedStore( StoreKey key ) if ( affectedStore != null ) { BoundStatement bound = preparedAffectedStoreDel.bind( key.toString() ); - session.execute( bound ); + executeSession( bound ); } } + + private ResultSet executeSession ( BoundStatement bind ) + { + boolean exception = false; + ResultSet trackingRecord = null; + try + { + if ( session == null || session.isClosed() ) + { + client.close(); + client.init(); + this.init(); + } + trackingRecord = session.execute( bind ); + } + catch ( NoHostAvailableException e ) + { + exception = true; + logger.error( "Cannot connect to host, reconnect once more with new session.", e ); + } + finally + { + if ( exception ) + { + client.close(); + client.init(); + this.init(); + trackingRecord = session.execute( bind ); + } + } + return trackingRecord; + } }