Skip to content

Commit

Permalink
storage method in bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Rıza Kat committed Sep 19, 2024
1 parent c572bf1 commit b4a96bf
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
25 changes: 14 additions & 11 deletions lib/countly-bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var cc = require("./countly-common");
var BulkUser = require("./countly-bulk-user");
var CountlyStorage = require("./countly-storage");

var storageMethod;

/**
* @lends module:lib/countly-bulk
* Initialize CountlyBulk server object
Expand Down Expand Up @@ -102,7 +104,8 @@ function CountlyBulk(conf) {
conf.maxStackTraceLinesPerThread = conf.max_stack_trace_lines_per_thread || maxStackTraceLinesPerThread;
conf.maxStackTraceLineLength = conf.max_stack_trace_line_length || maxStackTraceLineLength;

CountlyStorage.setStoragePath(conf.storage_path, true, conf.persist_queue);
// config time memory only option will be added
storageMethod = CountlyStorage.setStorage(conf.storage_path, false, true, conf.persist_queue);

this.conf = conf;
/**
Expand Down Expand Up @@ -142,7 +145,7 @@ function CountlyBulk(conf) {

requestQueue.push(query);
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_request, Adding request to the queue.");
CountlyStorage.storeSet("cly_req_queue", requestQueue);
storageMethod.storeSet("cly_req_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_request, Sending message to the parent process. Adding the raw request to the queue.");
Expand Down Expand Up @@ -190,7 +193,7 @@ function CountlyBulk(conf) {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_bulk_request, adding the request into queue.");
requestQueue.push(query);
}
CountlyStorage.storeSet("cly_req_queue", requestQueue);
storageMethod.storeSet("cly_req_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_bulk_request, Sending message to the parent process. Adding the raw request to the queue.");
Expand Down Expand Up @@ -245,7 +248,7 @@ function CountlyBulk(conf) {
eventQueue[device_id] = [];
}
eventQueue[device_id].push(e);
CountlyStorage.storeSet("cly_bulk_event", eventQueue);
storageMethod.storeSet("cly_bulk_event", eventQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, `CountlyBulk add_event, Sending message to the parent process. Adding event: [${event.key}].`);
Expand Down Expand Up @@ -343,7 +346,7 @@ function CountlyBulk(conf) {
*/
function toBulkRequestQueue(bulkRequest) {
bulkQueue.push(bulkRequest);
CountlyStorage.storeSet("cly_bulk_queue", bulkQueue);
storageMethod.storeSet("cly_bulk_queue", bulkQueue);
}
var self = this;

Expand All @@ -369,7 +372,7 @@ function CountlyBulk(conf) {
}
if (eventChanges) {
isEmpty = false;
CountlyStorage.storeSet("cly_bulk_event", eventQueue);
storageMethod.storeSet("cly_bulk_event", eventQueue);
}

// process request queue into bulk requests
Expand All @@ -383,7 +386,7 @@ function CountlyBulk(conf) {
var requests = requestQueue.splice(0, conf.bulk_size);
toBulkRequestQueue({ app_key: conf.app_key, requests: JSON.stringify(requests) });
}
CountlyStorage.storeSet("cly_req_queue", requestQueue);
storageMethod.storeSet("cly_req_queue", requestQueue);
}

// process bulk request queue
Expand All @@ -398,7 +401,7 @@ function CountlyBulk(conf) {
bulkQueue.unshift(res);
failTimeout = cc.getTimestamp() + conf.fail_timeout;
}
CountlyStorage.storeSet("cly_bulk_queue", bulkQueue);
storageMethod.storeSet("cly_bulk_queue", bulkQueue);
readyToProcess = true;
}, "heartBeat", false);
}
Expand Down Expand Up @@ -591,9 +594,9 @@ function CountlyBulk(conf) {
worker.on("message", handleWorkerMessage);
});

var requestQueue = CountlyStorage.storeGet("cly_req_queue", []);
var eventQueue = CountlyStorage.storeGet("cly_bulk_event", {});
var bulkQueue = CountlyStorage.storeGet("cly_bulk_queue", []);
var requestQueue = storageMethod.storeGet("cly_req_queue", []);
var eventQueue = storageMethod.storeGet("cly_bulk_event", {});
var bulkQueue = storageMethod.storeGet("cly_bulk_queue", []);
}

module.exports = CountlyBulk;
2 changes: 1 addition & 1 deletion lib/countly.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ Countly.Bulk = Bulk;
// Common module debug value is set to init time debug value
cc.debug = conf.debug;

// Set the storage path
// Set the storage method and path
storageMethod = CountlyStorage.setStorage(conf.storage_path);

// clear stored device ID if flag is set
Expand Down

0 comments on commit b4a96bf

Please sign in to comment.