Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: memory issue crash happening on CloudModeProcessor #544

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 39 additions & 46 deletions Sources/Classes/RSCloudModeManager.m
Original file line number Diff line number Diff line change
Expand Up @@ -39,55 +39,48 @@ - (void) startCloudModeProcessor {
[RSLogger logDebug:@"RSCloudModeManager: CloudModeProcessor: Starting the Cloud Mode Processor"];
int sleepCount = 0;
while (YES) {
@try {
[strongSelf->lock lock];
RSNetworkResponse* response = nil;
[strongSelf->dbPersistentManager clearOldEventsWithThreshold: strongSelf->config.dbCountThreshold];
[RSLogger logDebug:@"RSCloudModeManager: CloudModeProcessor: Fetching events to flush to server"];
RSDBMessage* dbMessage = [strongSelf->dbPersistentManager fetchEventsFromDB:(strongSelf->config.flushQueueSize) ForMode:CLOUDMODE];
if ((dbMessage.messages.count >= strongSelf->config.flushQueueSize) || (dbMessage.messages.count > 0 && (sleepCount >= strongSelf->config.sleepTimeout))) {
NSString* payload = [RSCloudModeManager getPayloadFromMessages:dbMessage];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Payload: %@", payload]];
[RSLogger logInfo:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: EventCount: %lu", (unsigned long)dbMessage.messageIds.count]];
[RSMetricsReporter report:SDKMETRICS_CM_EVENT forMetricType:COUNT withProperties:@{SDKMETRICS_TYPE: SDKMETRICS_MESSAGES} andValue:(float)dbMessage.messages.count];
if (payload != nil) {
response = [strongSelf->networkManager sendNetworkRequest:payload toEndpoint:BATCH_ENDPOINT withRequestMethod:POST];
if (response.state == NETWORK_SUCCESS) {
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Updating status as CLOUDMODEPROCESSING DONE for events (%@)",[RSUtils getCSVStringFromArray:dbMessage.messageIds]]];
[RSMetricsReporter report:SDKMETRICS_CM_ATTEMPT_SUCCESS forMetricType:COUNT withProperties:nil andValue:(float)dbMessage.messages.count];
[strongSelf->dbPersistentManager updateEventsWithIds:dbMessage.messageIds withStatus:CLOUD_MODE_PROCESSING_DONE];
[strongSelf->dbPersistentManager clearProcessedEventsFromDB];
sleepCount = 0;
[self->backOff reset];
}
[strongSelf->lock lock];
RSNetworkResponse* response = nil;
[strongSelf->dbPersistentManager clearOldEventsWithThreshold: strongSelf->config.dbCountThreshold];
[RSLogger logDebug:@"RSCloudModeManager: CloudModeProcessor: Fetching events to flush to server"];
RSDBMessage* dbMessage = [strongSelf->dbPersistentManager fetchEventsFromDB:(strongSelf->config.flushQueueSize) ForMode:CLOUDMODE];
if ((dbMessage.messages.count >= strongSelf->config.flushQueueSize) || (dbMessage.messages.count > 0 && (sleepCount >= strongSelf->config.sleepTimeout))) {
NSString* payload = [RSCloudModeManager getPayloadFromMessages:dbMessage];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Payload: %@", payload]];
[RSLogger logInfo:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: EventCount: %lu", (unsigned long)dbMessage.messageIds.count]];
[RSMetricsReporter report:SDKMETRICS_CM_EVENT forMetricType:COUNT withProperties:@{SDKMETRICS_TYPE: SDKMETRICS_MESSAGES} andValue:(float)dbMessage.messages.count];
if (payload != nil) {
response = [strongSelf->networkManager sendNetworkRequest:payload toEndpoint:BATCH_ENDPOINT withRequestMethod:POST];
if (response.state == NETWORK_SUCCESS) {
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Updating status as CLOUDMODEPROCESSING DONE for events (%@)",[RSUtils getCSVStringFromArray:dbMessage.messageIds]]];
[RSMetricsReporter report:SDKMETRICS_CM_ATTEMPT_SUCCESS forMetricType:COUNT withProperties:nil andValue:(float)dbMessage.messages.count];
[strongSelf->dbPersistentManager updateEventsWithIds:dbMessage.messageIds withStatus:CLOUD_MODE_PROCESSING_DONE];
[strongSelf->dbPersistentManager clearProcessedEventsFromDB];
sleepCount = 0;
[self->backOff reset];
}
}
[strongSelf->lock unlock];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: cloudModeSleepCount: %d", sleepCount]];
sleepCount += 1;

if(response == nil) {
sleep(1);
} else if (response.state == WRONG_WRITE_KEY) {
[RSLogger logError:@"RSCloudModeManager: CloudModeProcessor: Wrong WriteKey. Aborting the Cloud Mode Processor"];
break;
} else if (response.state == INVALID_URL) {
[RSLogger logError:@"RSCloudModeManager: CloudModeProcessor: Invalid Data Plane URL. Aborting the Cloud Mode Processor"];
[RSMetricsReporter report:SDKMETRICS_CM_ATTEMPT_ABORT forMetricType:COUNT withProperties:@{SDKMETRICS_TYPE: SDKMETRICS_DATA_PLANE_URL_INVALID} andValue:1];
break;
} else if (response.state == NETWORK_ERROR) {
int delay = [self->backOff nextDelay];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Retrying in: %@", [RSUtils secondsToString:delay]]];
[RSMetricsReporter report:SDKMETRICS_CM_ATTEMPT_RETRY forMetricType:COUNT withProperties:nil andValue:1];
sleep(delay);
} else { // To handle the status code RESOURCE_NOT_FOUND(404) & BAD_REQUEST(400)
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Retrying in: 1s"]];
sleep(1);
}
}
@catch (NSException *exception) {
[strongSelf->lock unlock];
[RSLogger logError:[NSString stringWithFormat:@"RSCloudModeManager: CloudModeProcessor: Failed to flush current batch, reason: %@", exception.reason]];
[strongSelf->lock unlock];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: cloudModeSleepCount: %d", sleepCount]];
sleepCount += 1;

if(response == nil) {
sleep(1);
} else if (response.state == WRONG_WRITE_KEY) {
[RSLogger logError:@"RSCloudModeManager: CloudModeProcessor: Wrong WriteKey. Aborting the Cloud Mode Processor"];
break;
} else if (response.state == INVALID_URL) {
[RSLogger logError:@"RSCloudModeManager: CloudModeProcessor: Invalid Data Plane URL. Aborting the Cloud Mode Processor"];
[RSMetricsReporter report:SDKMETRICS_CM_ATTEMPT_ABORT forMetricType:COUNT withProperties:@{SDKMETRICS_TYPE: SDKMETRICS_DATA_PLANE_URL_INVALID} andValue:1];
break;
} else if (response.state == NETWORK_ERROR) {
int delay = [self->backOff nextDelay];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Retrying in: %@", [RSUtils secondsToString:delay]]];
[RSMetricsReporter report:SDKMETRICS_CM_ATTEMPT_RETRY forMetricType:COUNT withProperties:nil andValue:1];
sleep(delay);
} else { // To handle the status code RESOURCE_NOT_FOUND(404) & BAD_REQUEST(400)
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Retrying in: 1s"]];
sleep(1);
}
}
Expand Down
80 changes: 43 additions & 37 deletions Sources/Classes/RSDBPersistentManager.m
Original file line number Diff line number Diff line change
Expand Up @@ -490,48 +490,54 @@ -(RSDBMessage*) fetchAllEventsFromDBForMode:(MODES) mode {
}

- (RSDBMessage *) getEventsFromDB :(NSString*) querySQLString {
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSDBPersistentManager: getEventsFromDB: fetchEventSql: %@", querySQLString]];

const char* querySQL = [querySQLString UTF8String];
NSMutableArray<NSString *> *messageIds = [NSMutableArray array];
NSMutableArray<NSString *> *messages = [NSMutableArray array];
NSMutableArray<NSNumber *> *statusList = [NSMutableArray array];
NSMutableArray<NSNumber *> *dmProcessedList = [NSMutableArray array];

@synchronized (self) {
sqlite3_stmt *queryStmt = nil;
if ([database prepare_v2:querySQL nBytes:-1 ppStmt:&queryStmt pzTail:NULL] == SQLITE_OK) {
[RSLogger logDebug:@"RSDBPersistentManager: getEventsFromDB: Successfully fetched events from DB"];
while ([database step:queryStmt] == SQLITE_ROW) {
@autoreleasepool {
int messageId = [database column_int:queryStmt i:0];
const unsigned char* queryResultCol1 = [database column_text:queryStmt i:1];
NSString *message = [[NSString alloc] initWithUTF8String:(char *)queryResultCol1];
int status = [database column_int:queryStmt i:3];
int dmProcessed = [database column_int:queryStmt i:4];

[messageIds addObject:[[NSString alloc] initWithFormat:@"%d", messageId]];
[messages addObject:message];
[statusList addObject:[NSNumber numberWithInt:status]];
[dmProcessedList addObject:[NSNumber numberWithInt:dmProcessed]];
@try {
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSDBPersistentManager: getEventsFromDB: fetchEventSql: %@", querySQLString]];

const char* querySQL = [querySQLString UTF8String];
NSMutableArray<NSString *> *messageIds = [NSMutableArray array];
NSMutableArray<NSString *> *messages = [NSMutableArray array];
NSMutableArray<NSNumber *> *statusList = [NSMutableArray array];
NSMutableArray<NSNumber *> *dmProcessedList = [NSMutableArray array];

@synchronized (self) {
sqlite3_stmt *queryStmt = nil;
if ([database prepare_v2:querySQL nBytes:-1 ppStmt:&queryStmt pzTail:NULL] == SQLITE_OK) {
[RSLogger logDebug:@"RSDBPersistentManager: getEventsFromDB: Successfully fetched events from DB"];
while ([database step:queryStmt] == SQLITE_ROW) {
@autoreleasepool {
int messageId = [database column_int:queryStmt i:0];
const unsigned char* queryResultCol1 = [database column_text:queryStmt i:1];
NSString *message = [[NSString alloc] initWithUTF8String:(char *)queryResultCol1];
int status = [database column_int:queryStmt i:3];
int dmProcessed = [database column_int:queryStmt i:4];

[messageIds addObject:[[NSString alloc] initWithFormat:@"%d", messageId]];
[messages addObject:message];
[statusList addObject:[NSNumber numberWithInt:status]];
[dmProcessedList addObject:[NSNumber numberWithInt:dmProcessed]];
}
}
} else {
[RSLogger logError:@"RSDBPersistentManager: getEventsFromDB: Failed to fetch events from DB"];
}

// Finalize the SQL statement to free resources
if (queryStmt != nil) {
[database finalize:queryStmt];
}
} else {
[RSLogger logError:@"RSDBPersistentManager: getEventsFromDB: Failed to fetch events from DB"];
}

// Finalize the SQL statement to free resources
if (queryStmt != nil) {
[database finalize:queryStmt];
}
RSDBMessage *dbMessage = [[RSDBMessage alloc] init];
dbMessage.messageIds = messageIds;
dbMessage.messages = messages;
dbMessage.statusList = statusList;
dbMessage.dmProcessed = dmProcessedList;
return dbMessage;

} @catch (NSException *exception) {
[RSLogger logError:[NSString stringWithFormat:@"RSDBPersistentManager: getEventsFromDB: Failed to fetch events from DB, reason: %@", exception.reason]];
return nil;
saikumarrs marked this conversation as resolved.
Show resolved Hide resolved
}

RSDBMessage *dbMessage = [[RSDBMessage alloc] init];
dbMessage.messageIds = messageIds;
dbMessage.messages = messages;
dbMessage.statusList = statusList;
dbMessage.dmProcessed = dmProcessedList;
return dbMessage;
}

// If mode is passed as DEVICEMODE, this function would return the total number of events which were waiting for the Device Mode Processing to be done
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"version": "1.29.1",
"version": "1.29.2-beta",
"description": "Rudder is a platform for collecting, storing and routing customer event data to dozens of tools"
}
Loading