From 5fc3a313d823f10314129641f87b2e433d1211e9 Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Fri, 7 Jun 2024 12:58:25 -0300 Subject: [PATCH] SDKS-8457 --- .../split/engine/common/PushManagerImp.java | 50 ++++++++----- .../io/split/engine/sse/client/SSEClient.java | 73 ++++++++++--------- 2 files changed, 71 insertions(+), 52 deletions(-) diff --git a/client/src/main/java/io/split/engine/common/PushManagerImp.java b/client/src/main/java/io/split/engine/common/PushManagerImp.java index 27b535d0..f787bf66 100644 --- a/client/src/main/java/io/split/engine/common/PushManagerImp.java +++ b/client/src/main/java/io/split/engine/common/PushManagerImp.java @@ -30,6 +30,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static com.google.common.base.Preconditions.checkNotNull; import static io.split.client.utils.SplitExecutorFactory.buildSingleThreadScheduledExecutor; @@ -42,6 +44,8 @@ public class PushManagerImp implements PushManager { private final FeatureFlagsWorker _featureFlagsWorker; private final Worker _segmentWorker; private final PushStatusTracker _pushStatusTracker; + private static final Lock startLock = new ReentrantLock(); + private static final Lock stopLock = new ReentrantLock(); private Future _nextTokenRefreshTask; private final ScheduledExecutorService _scheduledExecutorService; @@ -92,28 +96,38 @@ public static PushManagerImp build(Synchronizer synchronizer, } @Override - public synchronized void start() { - AuthenticationResponse response = _authApiClient.Authenticate(); - _log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled())); - if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) { - _expirationTime.set(response.getExpiration()); - _telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(), - response.getExpiration(), System.currentTimeMillis())); - return; - } - - cleanUpResources(); - if (response.isRetry()) { - _pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR); - } else { - _pushStatusTracker.forcePushDisable(); + public void start() { + try { + startLock.lock(); + AuthenticationResponse response = _authApiClient.Authenticate(); + _log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled())); + if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) { + _expirationTime.set(response.getExpiration()); + _telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(), + response.getExpiration(), System.currentTimeMillis())); + return; + } + + cleanUpResources(); + if (response.isRetry()) { + _pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR); + } else { + _pushStatusTracker.forcePushDisable(); + } + } finally { + startLock.unlock(); } } @Override - public synchronized void stop() { - _log.debug("Stopping PushManagerImp"); - cleanUpResources(); + public void stop() { + try { + stopLock.lock(); + _log.debug("Stopping PushManagerImp"); + cleanUpResources(); + } finally { + stopLock.unlock(); + } } @Override diff --git a/client/src/main/java/io/split/engine/sse/client/SSEClient.java b/client/src/main/java/io/split/engine/sse/client/SSEClient.java index 2a87d8e9..608d7f59 100644 --- a/client/src/main/java/io/split/engine/sse/client/SSEClient.java +++ b/client/src/main/java/io/split/engine/sse/client/SSEClient.java @@ -6,7 +6,6 @@ import io.split.telemetry.domain.enums.StreamEventsEnum; import io.split.telemetry.storage.TelemetryRuntimeProducer; import org.apache.hc.client5.http.classic.methods.HttpGet; -import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; import org.slf4j.Logger; @@ -25,6 +24,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import static com.google.common.base.Preconditions.checkNotNull; @@ -49,6 +50,8 @@ private enum ConnectionState { private final static String SOCKET_CLOSED_MESSAGE = "Socket closed"; private final static String KEEP_ALIVE_PAYLOAD = ":keepalive\n"; private final static long CONNECT_TIMEOUT = 30000; + private static final Lock openLock = new ReentrantLock(); + private static final Lock closeLock = new ReentrantLock(); private static final Logger _log = LoggerFactory.getLogger(SSEClient.class); private final ExecutorService _connectionExecutor; private final CloseableHttpClient _client; @@ -60,7 +63,6 @@ private enum ConnectionState { private AtomicBoolean _forcedStop; private final RequestDecorator _requestDecorator; private final TelemetryRuntimeProducer _telemetryRuntimeProducer; - private final AtomicBoolean openGuard = new AtomicBoolean(false); public SSEClient(Function eventCallback, Function statusCallback, @@ -78,53 +80,56 @@ public SSEClient(Function eventCallback, } public boolean open(URI uri) { - if (isOpen()) { - _log.info("SSEClient already open."); - return false; - } - - if (!openGuard.compareAndSet(false, true)) { - _log.debug("Open SSEClient already running"); - return false; - } - - _statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS); - - CountDownLatch signal = new CountDownLatch(1); - _connectionExecutor.submit(() -> connectAndLoop(uri, signal)); try { - if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) { + openLock.lock(); + if (isOpen()) { + _log.info("SSEClient already open."); return false; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if(e.getMessage() == null){ - _log.info("The thread was interrupted while opening SSEClient"); + + _statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS); + + CountDownLatch signal = new CountDownLatch(1); + _connectionExecutor.submit(() -> connectAndLoop(uri, signal)); + try { + if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) { + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if(e.getMessage() == null){ + _log.info("The thread was interrupted while opening SSEClient"); + return false; + } + _log.info(e.getMessage()); return false; } - _log.info(e.getMessage()); - return false; + return isOpen(); } finally { - openGuard.set(false); + openLock.unlock(); } - return isOpen(); } public boolean isOpen() { return (ConnectionState.OPEN.equals(_state.get())); } - public synchronized void close() { - _forcedStop.set(true); - if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) { - if (_ongoingResponse.get() != null) { - try { - _ongoingRequest.get().abort(); - _ongoingResponse.get().close(); - } catch (IOException e) { - _log.debug(String.format("SSEClient close forced: %s", e.getMessage())); + public void close() { + try { + closeLock.lock(); + _forcedStop.set(true); + if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) { + if (_ongoingResponse.get() != null) { + try { + _ongoingRequest.get().abort(); + _ongoingResponse.get().close(); + } catch (IOException e) { + _log.debug(String.format("SSEClient close forced: %s", e.getMessage())); + } } } + } finally { + closeLock.unlock(); } }