Skip to content

Commit

Permalink
Merge pull request #487 from rudderlabs/refactor/sdk-2733-handle-outo…
Browse files Browse the repository at this point in the history
…fmemory-error

refactor: catch OutOfMemoryError in RudderCloudModeManager
  • Loading branch information
vgupta98 authored Dec 9, 2024
2 parents 4579fc2 + b8f9b1b commit 0b3759e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,39 @@ public void run() {
final ArrayList<String> messages = new ArrayList<>();
final ExponentialBackOff exponentialBackOff = new ExponentialBackOff(5 * 60); // 5 minutes
while (true) {
// clear lists for reuse
messageIds.clear();
messages.clear();
result = null;
maintainDBThreshold();
long sleepCount = Utils.getSleepDurationInSecond(upTimeInMillis, Utils.getUpTimeInMillis());
RudderLogger.logDebug("CloudModeManager: cloudModeProcessor: Fetching events to flush to server");
synchronized (MessageUploadLock.UPLOAD_LOCK) {
dbManager.fetchCloudModeEventsFromDB(messageIds, messages, config.getFlushQueueSize());
if (messages.size() >= config.getFlushQueueSize() || (!messages.isEmpty() && sleepCount >= config.getSleepTimeOut())) {
// form payload JSON form the list of messages
String payload = FlushUtils.getPayloadFromMessages(messageIds, messages);
RudderLogger.logDebug(String.format(Locale.US, "CloudModeManager: cloudModeProcessor: payload: %s", payload));
RudderLogger.logInfo(String.format(Locale.US, "CloudModeManager: cloudModeProcessor: %d", messageIds.size()));
if (payload != null) {
result = networkManager.sendNetworkRequest(payload, addEndPoint(dataResidencyManager.getDataPlaneUrl(), BATCH_ENDPOINT), RequestMethod.POST, true);
RudderLogger.logInfo(String.format(Locale.US, "CloudModeManager: cloudModeProcessor: ServerResponse: %d", result.statusCode));
if (result.status == NetworkResponses.SUCCESS) {
ReportManager.incrementCloudModeUploadSuccessCounter(messageIds.size());
cleanUpEvents(messageIds);
exponentialBackOff.resetBackOff();
upTimeInMillis = Utils.getUpTimeInMillis();
sleepCount = Utils.getSleepDurationInSecond(upTimeInMillis, Utils.getUpTimeInMillis());
try {
// clear lists for reuse
messageIds.clear();
messages.clear();
result = null;
maintainDBThreshold();
long sleepCount = Utils.getSleepDurationInSecond(upTimeInMillis, Utils.getUpTimeInMillis());
RudderLogger.logDebug("CloudModeManager: cloudModeProcessor: Fetching events to flush to server");
synchronized (MessageUploadLock.UPLOAD_LOCK) {
dbManager.fetchCloudModeEventsFromDB(messageIds, messages, config.getFlushQueueSize());
if (messages.size() >= config.getFlushQueueSize() || (!messages.isEmpty() && sleepCount >= config.getSleepTimeOut())) {
// form payload JSON form the list of messages
String payload = FlushUtils.getPayloadFromMessages(messageIds, messages);
RudderLogger.logDebug(String.format(Locale.US, "CloudModeManager: cloudModeProcessor: payload: %s", payload));
RudderLogger.logInfo(String.format(Locale.US, "CloudModeManager: cloudModeProcessor: %d", messageIds.size()));
if (payload != null) {
result = networkManager.sendNetworkRequest(payload, addEndPoint(dataResidencyManager.getDataPlaneUrl(), BATCH_ENDPOINT), RequestMethod.POST, true);
RudderLogger.logInfo(String.format(Locale.US, "CloudModeManager: cloudModeProcessor: ServerResponse: %d", result.statusCode));
if (result.status == NetworkResponses.SUCCESS) {
ReportManager.incrementCloudModeUploadSuccessCounter(messageIds.size());
cleanUpEvents(messageIds);
exponentialBackOff.resetBackOff();
upTimeInMillis = Utils.getUpTimeInMillis();
sleepCount = Utils.getSleepDurationInSecond(upTimeInMillis, Utils.getUpTimeInMillis());
} else {
incrementCloudModeUploadRetryCounter(1);
}
} else {
incrementCloudModeUploadRetryCounter(1);
cleanUpEvents(messageIds);
}
} else {
cleanUpEvents(messageIds);
}
}
}
RudderLogger.logDebug(String.format(Locale.US, "CloudModeManager: cloudModeProcessor: SleepCount: %d", sleepCount));
try {
RudderLogger.logDebug(String.format(Locale.US, "CloudModeManager: cloudModeProcessor: SleepCount: %d", sleepCount));
if (result == null) {
RudderLogger.logDebug("CloudModeManager: cloudModeProcessor: Sleeping for next: " + config.getEventDispatchSleepInterval() + "ms");
Thread.sleep(config.getEventDispatchSleepInterval());
Expand Down Expand Up @@ -107,6 +107,10 @@ public void run() {
ReportManager.reportError(ex);
RudderLogger.logError(String.format("CloudModeManager: cloudModeProcessor: Exception while trying to send events to Data plane URL %s due to %s", config.getDataPlaneUrl(), ex.getLocalizedMessage()));
Thread.currentThread().interrupt();
} catch (OutOfMemoryError e) {
RudderLogger.logError(String.format("CloudModeManager: cloudModeProcessor: Out of memory error: %s occurred while trying to send events to Data plane URL: %s", e.getLocalizedMessage(), config.getDataPlaneUrl()));
// sleeping the thread for 1s to avoid continuous loop after OOM.
Utils.sleep(1000);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,13 @@ public static String getTimeInReadableFormat(long timeInMillis) {

return timeInReadableFormat.toString();
}

public static void sleep(long timeInMillis) {
try {
Thread.sleep(timeInMillis);
} catch (InterruptedException ex) {
ReportManager.reportError(ex);
Thread.currentThread().interrupt();
}
}
}

0 comments on commit 0b3759e

Please sign in to comment.