Skip to content

Commit

Permalink
fix: batches failed to be sent to data plane (#367)
Browse files Browse the repository at this point in the history
* fix: handled race condition generating invalid event payloads

* fix: deleted invalid events (without anonId)  generated due to race condition while sdk initialization

* fix: de-serializing events from db to Map while checking if they contain anonymousId

* chore: minor changes

---------

Co-authored-by: Desu Sai Venkat <[email protected]>
  • Loading branch information
desusai7 and Desu Sai Venkat authored Dec 13, 2023
1 parent 63cc9f1 commit 2a04e28
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,6 @@ public Long getSessionId() {
}

private RudderMessage updateMessageWithConsentedDestinations(RudderMessage message) {
RudderClient rudderClient = RudderClient.getInstance();
if (rudderClient == null)
return message;

if (consentFilterHandler == null) {
return message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/*
* Primary class to be used in client
Expand All @@ -36,6 +37,7 @@ public class RudderClient {
private static RudderOption defaultOptions;
private static String deviceToken;
private static String authToken;
private static ReentrantLock reentrantLock = new ReentrantLock();


private static final int NUMBER_OF_FLUSH_CALLS_IN_QUEUE = 1;
Expand Down Expand Up @@ -136,22 +138,25 @@ public static RudderClient getInstance(@NonNull Context context, @Nullable Strin
// get application from provided context
application = (Application) context.getApplicationContext();

// initiate RudderClient instance
instance = new RudderClient();
// initiate EventRepository class
if (application != null) {
RudderLogger.logVerbose("getInstance: creating EventRepository.");
EventRepository.Identifiers identifiers = new EventRepository
.Identifiers(writeKey, deviceToken, anonymousId, advertisingId, authToken);
repository = new EventRepository(application, config, identifiers);
reentrantLock.lock();
try {
// initiate RudderClient instance
instance = new RudderClient();
// initiate EventRepository class
if (application != null) {
RudderLogger.logVerbose("getInstance: creating EventRepository.");
EventRepository.Identifiers identifiers = new EventRepository
.Identifiers(writeKey, deviceToken, anonymousId, advertisingId, authToken);
repository = new EventRepository(application, config, identifiers);
}
} finally {
reentrantLock.unlock();
}

}
return instance;
}



private static void updateConfigWithValidValuesIfNecessary(@NonNull RudderConfig config) {
if (config.getFlushQueueSize() < 0 || config.getFlushQueueSize() > 100) {
RudderLogger.logVerbose("getInstance: FlushQueueSize is wrong. using default.");
Expand All @@ -168,12 +173,17 @@ private static void updateConfigWithValidValuesIfNecessary(@NonNull RudderConfig
}
}

/*
* package private api to be used in EventRepository
* */
/**
* API for getting instance of RudderClient, if already initiated. Otherwise returns null.
*
* @return instance of RudderClient
*/
@Nullable
public static RudderClient getInstance() {
return instance;
reentrantLock.lock();
RudderClient client = instance;
reentrantLock.unlock();
return client;
}

/**
Expand All @@ -186,12 +196,13 @@ public static RudderClient getInstance() {
*/
@NonNull
public static RudderClient with(@NonNull Context context) {
if (instance == null) {
RudderClient client = RudderClient.getInstance();
if (client == null) {
String writeKey = null;
writeKey = Utils.getWriteKeyFromStrings(context);
return getInstance(context, writeKey);
}
return instance;
return client;
}

/**
Expand Down Expand Up @@ -469,11 +480,11 @@ public void alias(String newId) {
*/
public void alias(@NonNull String newId, @Nullable RudderOption option) {
RudderContext context = getRudderContext();
Map<String, Object> traits= null;
Map<String, Object> traits = null;
if (context != null) {
traits = context.getTraits();
}
if(traits == null)
if (traits == null)
return;
String prevUserId = null;

Expand Down Expand Up @@ -587,7 +598,9 @@ public void group(@NonNull String groupId, @Nullable RudderTraits traits, @Nulla
* @param _instance RudderClient instance
*/
public static void setSingletonInstance(@NonNull RudderClient _instance) {
reentrantLock.lock();
instance = _instance;
reentrantLock.unlock();
}

/**
Expand Down Expand Up @@ -622,7 +635,7 @@ public static void updateWithAdvertisingId(@NonNull String advertisingId) {
* @param advertisingId IDFA for the device
*/
public static void putAdvertisingId(@NonNull String advertisingId) {
if (instance == null) {
if (RudderClient.getInstance() == null) {
// rudder sdk is not initialised yet. let's use the advertisingId from the beginning
RudderClient.advertisingId = advertisingId;
return;
Expand All @@ -639,7 +652,7 @@ public static void putAdvertisingId(@NonNull String advertisingId) {
* @param deviceToken Push Token from FCM
*/
public static void putDeviceToken(@NonNull String deviceToken) {
if (instance == null) {
if (RudderClient.getInstance() == null) {
// rudder sdk is not initialised yet. let's use the deviceToken from the beginning
RudderClient.deviceToken = deviceToken;
return;
Expand All @@ -666,7 +679,7 @@ public static void setAnonymousId(@NonNull String anonymousId) {
* @param anonymousId AnonymousId you want to use for the application
*/
public static void putAnonymousId(@NonNull String anonymousId) {
if (instance == null) {
if (RudderClient.getInstance() == null) {
// rudder sdk is not initialised yet. let's use the anonymousId from the beginning
RudderClient.anonymousId = anonymousId;
return;
Expand All @@ -688,7 +701,7 @@ public String getAnonymousId() {
}

public static void putAuthToken(@NonNull String authToken) {
if (instance == null) {
if (RudderClient.getInstance() == null) {
RudderClient.authToken = authToken;
return;
}
Expand All @@ -702,6 +715,7 @@ public static void putAuthToken(@NonNull String authToken) {

/**
* Reset SDK
*
* @deprecated Use {@link #reset(boolean) reset(false)} instead
*/
public void reset() {
Expand Down Expand Up @@ -841,6 +855,7 @@ public void endSession() {

/**
* Public method for getting the current session id.
*
* @return
*/
@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package com.rudderstack.android.sdk.core;

import static com.rudderstack.android.sdk.core.ReportManager.LABEL_TYPE;
import static com.rudderstack.android.sdk.core.ReportManager.incrementCloudModeUploadRetryCounter;
import static com.rudderstack.android.sdk.core.ReportManager.incrementDiscardedCounter;
import static com.rudderstack.android.sdk.core.RudderNetworkManager.NetworkResponses;
import static com.rudderstack.android.sdk.core.RudderNetworkManager.RequestMethod;
import static com.rudderstack.android.sdk.core.RudderNetworkManager.Result;
import static com.rudderstack.android.sdk.core.RudderNetworkManager.addEndPoint;

import com.rudderstack.android.sdk.core.gson.RudderGson;
import com.rudderstack.android.sdk.core.util.MessageUploadLock;
import com.rudderstack.android.sdk.core.util.Utils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;

public class RudderCloudModeManager {

Expand Down Expand Up @@ -79,6 +78,10 @@ public void run() {
case WRITE_KEY_ERROR:
RudderLogger.logError("CloudModeManager: cloudModeProcessor: Wrong WriteKey. Terminating the Cloud Mode Processor");
return;
case MISSING_ANONYMOUSID_AND_USERID:
RudderLogger.logError("CloudModeManager: cloudModeProcessor: Request Failed as the batch payload contains events without anonymousId and userId, hence deleting those events from DB");
deleteEventsWithoutAnonymousId(messages, messageIds);
break;
case ERROR:
case NETWORK_UNAVAILABLE:
RudderLogger.logWarn("CloudModeManager: cloudModeProcessor: Retrying in " + Math.abs(sleepCount - config.getSleepTimeOut()) + "s");
Expand All @@ -98,6 +101,20 @@ public void run() {
}.start();
}

private void deleteEventsWithoutAnonymousId(ArrayList<String> messages, ArrayList<Integer> messageIds) {
List<Integer> eventsToDelete = new ArrayList<>();
for (int i = 0; i < messages.size(); i++) {
Map<String, Object> message = RudderGson.getInstance().fromJson(messages.get(i), Map.class);
if (!message.containsKey("anonymousId") || message.get("anonymousId") == null) {
eventsToDelete.add(messageIds.get(i));
}
}
if (!eventsToDelete.isEmpty()) {
dbManager.clearEventsFromDB(eventsToDelete);
RudderLogger.logDebug(String.format(Locale.US, "CloudModeManager: deleteEventsWithoutUserIdAndAnonymousId: Deleted %d events from DB", eventsToDelete.size()));
}
}

/*
* check if the number of events in the db crossed the dbCountThreshold then delete the older events which are in excess.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public enum NetworkResponses {
SUCCESS,
ERROR,
WRITE_KEY_ERROR,
MISSING_ANONYMOUSID_AND_USERID,
RESOURCE_NOT_FOUND,
NETWORK_UNAVAILABLE,
BAD_REQUEST
Expand Down Expand Up @@ -150,6 +151,8 @@ private Result getResult(HttpURLConnection httpConnection) {
httpConnection.getURL(), responseCode, errorPayload));
if (errorPayload != null && errorPayload.toLowerCase().contains("invalid write key"))
networkResponse = NetworkResponses.WRITE_KEY_ERROR;
else if (errorPayload != null && errorPayload.toLowerCase().contains("request neither has anonymousid nor userid"))
networkResponse = NetworkResponses.MISSING_ANONYMOUSID_AND_USERID;
else if (responseCode == 404)
networkResponse = NetworkResponses.RESOURCE_NOT_FOUND;
else if (responseCode == 400)
Expand Down

0 comments on commit 2a04e28

Please sign in to comment.