Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update PushManager and SyncManager #507

Merged
merged 6 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
4.12.1 (Jun 10, 2024)
- Fixed deadlock for virtual thread in Push Manager and SSE Client.

4.12.0 (May 15, 2024)
- Added support for targeting rules based on semantic versions (https://semver.org/).
- Added the logic to handle correctly when the SDK receives an unsupported Matcher type.
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.12.0</version>
<version>4.12.1</version>
</parent>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
Expand Down
67 changes: 42 additions & 25 deletions client/src/main/java/io/split/engine/common/PushManagerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.split.client.utils.SplitExecutorFactory.buildSingleThreadScheduledExecutor;
Expand All @@ -42,6 +44,7 @@ public class PushManagerImp implements PushManager {
private final FeatureFlagsWorker _featureFlagsWorker;
private final Worker<SegmentQueueDto> _segmentWorker;
private final PushStatusTracker _pushStatusTracker;
private static final Lock lock = new ReentrantLock();

private Future<?> _nextTokenRefreshTask;
private final ScheduledExecutorService _scheduledExecutorService;
Expand Down Expand Up @@ -92,37 +95,42 @@ public static PushManagerImp build(Synchronizer synchronizer,
}

@Override
public synchronized void start() {
AuthenticationResponse response = _authApiClient.Authenticate();
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
_expirationTime.set(response.getExpiration());
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
response.getExpiration(), System.currentTimeMillis()));
return;
}

stop();
if (response.isRetry()) {
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
} else {
_pushStatusTracker.forcePushDisable();
public void start() {
try {
lock.lock();
AuthenticationResponse response = _authApiClient.Authenticate();
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
_expirationTime.set(response.getExpiration());
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
response.getExpiration(), System.currentTimeMillis()));
return;
}

cleanUpResources();
if (response.isRetry()) {
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
} else {
_pushStatusTracker.forcePushDisable();
}
} finally {
lock.unlock();
}
}

@Override
public synchronized void stop() {
_log.debug("Stopping PushManagerImp");
_eventSourceClient.stop();
stopWorkers();
if (_nextTokenRefreshTask != null) {
_log.debug("Cancel nextTokenRefreshTask");
_nextTokenRefreshTask.cancel(false);
public void stop() {
try {
lock.lock();
_log.debug("Stopping PushManagerImp");
cleanUpResources();
} finally {
lock.unlock();
}
}

@Override
public synchronized void scheduleConnectionReset() {
public void scheduleConnectionReset() {
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", _expirationTime));
_nextTokenRefreshTask = _scheduledExecutorService.schedule(() -> {
_log.debug("Starting scheduleNextTokenRefresh ...");
Expand All @@ -142,14 +150,23 @@ private boolean startSse(String token, String channels) {
}

@Override
public synchronized void startWorkers() {
public void startWorkers() {
_featureFlagsWorker.start();
_segmentWorker.start();
}

@Override
public synchronized void stopWorkers() {
public void stopWorkers() {
_featureFlagsWorker.stop();
_segmentWorker.stop();
}

private void cleanUpResources() {
_eventSourceClient.stop();
stopWorkers();
if (_nextTokenRefreshTask != null) {
_log.debug("Cancel nextTokenRefreshTask");
_nextTokenRefreshTask.cancel(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ private void startPollingMode() {
long howLong = _backoff.interval();
_log.info(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong));
_synchronizer.startPeriodicFetching();
_pushManager.stopWorkers();
_pushManager.stop();
Thread.sleep(howLong * 1000);
_incomingPushStatus.clear();
Expand Down
69 changes: 40 additions & 29 deletions client/src/main/java/io/split/engine/sse/client/SSEClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.slf4j.Logger;
Expand All @@ -25,6 +24,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -49,6 +50,7 @@ private enum ConnectionState {
private final static String SOCKET_CLOSED_MESSAGE = "Socket closed";
private final static String KEEP_ALIVE_PAYLOAD = ":keepalive\n";
private final static long CONNECT_TIMEOUT = 30000;
private static final Lock lock = new ReentrantLock();
private static final Logger _log = LoggerFactory.getLogger(SSEClient.class);
private final ExecutorService _connectionExecutor;
private final CloseableHttpClient _client;
Expand All @@ -59,7 +61,6 @@ private enum ConnectionState {
private final AtomicReference<HttpGet> _ongoingRequest = new AtomicReference<>();
private AtomicBoolean _forcedStop;
private final RequestDecorator _requestDecorator;

private final TelemetryRuntimeProducer _telemetryRuntimeProducer;

public SSEClient(Function<RawEvent, Void> eventCallback,
Expand All @@ -77,47 +78,57 @@ public SSEClient(Function<RawEvent, Void> eventCallback,
_requestDecorator = requestDecorator;
}

public synchronized boolean open(URI uri) {
if (isOpen()) {
_log.info("SSEClient already open.");
return false;
}

_statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);

CountDownLatch signal = new CountDownLatch(1);
_connectionExecutor.submit(() -> connectAndLoop(uri, signal));
public boolean open(URI uri) {
try {
if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
lock.lock();
if (isOpen()) {
_log.info("SSEClient already open.");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if(e.getMessage() == null){
_log.info("The thread was interrupted while opening SSEClient");

_statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);

CountDownLatch signal = new CountDownLatch(1);
_connectionExecutor.submit(() -> connectAndLoop(uri, signal));
try {
if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if(e.getMessage() == null){
_log.info("The thread was interrupted while opening SSEClient");
return false;
}
_log.info(e.getMessage());
return false;
}
_log.info(e.getMessage());
return false;
return isOpen();
} finally {
lock.unlock();
}
return isOpen();
}

public boolean isOpen() {
return (ConnectionState.OPEN.equals(_state.get()));
}

public synchronized void close() {
_forcedStop.set(true);
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
if (_ongoingResponse.get() != null) {
try {
_ongoingRequest.get().abort();
_ongoingResponse.get().close();
} catch (IOException e) {
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
public void close() {
try {
lock.lock();
_forcedStop.set(true);
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
if (_ongoingResponse.get() != null) {
try {
_ongoingRequest.get().abort();
_ongoingResponse.get().close();
} catch (IOException e) {
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
}
}
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,22 @@ public class PushManagerTest {
private PushManager _pushManager;
private PushStatusTracker _pushStatusTracker;
private TelemetryStorage _telemetryStorage;
private FeatureFlagsWorker _featureFlagsWorker;
private SegmentsWorkerImp _segmentsWorkerImp;

@Before
public void setUp() {
_featureFlagsWorker = Mockito.mock(FeatureFlagsWorker.class);
_segmentsWorkerImp = Mockito.mock(SegmentsWorkerImp.class);
_authApiClient = Mockito.mock(AuthApiClient.class);
_eventSourceClient = Mockito.mock(EventSourceClient.class);
_backoff = Mockito.mock(Backoff.class);
_pushStatusTracker = Mockito.mock(PushStatusTrackerImp.class);
_telemetryStorage = new InMemoryTelemetryStorage();
_pushManager = new PushManagerImp(_authApiClient,
_eventSourceClient,
Mockito.mock(FeatureFlagsWorker.class),
Mockito.mock(SegmentsWorkerImp.class),
_featureFlagsWorker,
_segmentsWorkerImp,
_pushStatusTracker,
_telemetryStorage,
null);
Expand Down Expand Up @@ -107,4 +111,60 @@ public void startWithPushDisabledAndRetryTrueShouldConnect() throws InterruptedE
Thread.sleep(1500);
Mockito.verify(_pushStatusTracker, Mockito.times(1)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
}


@Test
public void startAndStop() throws InterruptedException {
AuthenticationResponse response = new AuthenticationResponse(true, "token-test", "channels-test", 1, false);

Mockito.when(_authApiClient.Authenticate())
.thenReturn(response);

Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken()))
.thenReturn(true);

_pushManager.start();

Mockito.verify(_authApiClient, Mockito.times(1)).Authenticate();
Mockito.verify(_eventSourceClient, Mockito.times(1)).start(response.getChannels(), response.getToken());

Thread.sleep(1500);

Mockito.verify(_pushStatusTracker, Mockito.times(0)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
Mockito.verify(_pushStatusTracker, Mockito.times(0)).forcePushDisable();
Assert.assertEquals(1, _telemetryStorage.popStreamingEvents().size());

_pushManager.stop();

Mockito.verify(_eventSourceClient, Mockito.times(1)).stop();
Mockito.verify(_featureFlagsWorker, Mockito.times(1)).stop();
Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).stop();
}

@Test
public void validateStartWorkers() {
_pushManager.startWorkers();
Mockito.verify(_featureFlagsWorker, Mockito.times(1)).start();
Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).start();
}

@Test
public void validateScheduleConnectionReset() throws InterruptedException {
AuthenticationResponse response = new AuthenticationResponse(false, "token-test", "channels-test", 3, false);

Mockito.when(_authApiClient.Authenticate())
.thenReturn(response);

Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken()))
.thenReturn(true);

_pushManager.start();

_pushManager.scheduleConnectionReset();
Thread.sleep(1000);

Mockito.verify(_eventSourceClient, Mockito.times(3)).stop();
Mockito.verify(_featureFlagsWorker, Mockito.times(3)).stop();
Mockito.verify(_segmentsWorkerImp, Mockito.times(3)).stop();
}
}
2 changes: 1 addition & 1 deletion pluggable-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>java-client-parent</artifactId>
<groupId>io.split.client</groupId>
<version>4.12.0</version>
<version>4.12.1</version>
</parent>

<version>2.1.0</version>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.12.0</version>
<version>4.12.1</version>
<dependencyManagement>
<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion redis-wrapper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>java-client-parent</artifactId>
<groupId>io.split.client</groupId>
<version>4.12.0</version>
<version>4.12.1</version>
</parent>
<artifactId>redis-wrapper</artifactId>
<version>3.1.0</version>
Expand Down
2 changes: 1 addition & 1 deletion testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.12.0</version>
<version>4.12.1</version>
</parent>
<artifactId>java-client-testing</artifactId>
<packaging>jar</packaging>
Expand Down
Loading