Skip to content

Commit

Permalink
Merge pull request #88 from transifex/dynamodb-throughput
Browse files Browse the repository at this point in the history
Update DynamoDB throughput
  • Loading branch information
Nikos Vasileiou authored Feb 9, 2024
2 parents d70831b + 534d7a3 commit c12596d
Show file tree
Hide file tree
Showing 8 changed files with 978 additions and 2,792 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#####################
### builder image ###

FROM node:20.10.0-alpine as builder
FROM node:20.11.0-alpine as builder

ARG USER_ID
ARG GROUP_ID
Expand Down
3,607 changes: 885 additions & 2,722 deletions package-lock.json

Large diffs are not rendered by default.

36 changes: 18 additions & 18 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,48 @@
"homepage": "https://github.com/transifex/transifex-delivery",
"license": "Apache-2.0",
"dependencies": {
"@azure/identity": "^3.4.1",
"@azure/identity": "^3.4.2",
"@azure/storage-blob": "^12.17.0",
"@google-cloud/storage": "^6.12.0",
"@sentry/node": "^7.84.0",
"aws-sdk": "^2.1509.0",
"axios": "^1.6.2",
"@sentry/node": "^7.100.1",
"aws-sdk": "^2.1554.0",
"axios": "^1.6.7",
"axios-retry": "^4.0.0",
"body-parser": "^1.20.2",
"bull": "^4.11.5",
"bull": "^4.12.2",
"chai-http": "^4.4.0",
"compression": "^1.7.4",
"cors": "^2.8.5",
"dayjs": "^1.11.10",
"express": "^4.18.2",
"express-prom-bundle": "^6.6.0",
"express-prom-bundle": "^7.0.0",
"express-rate-limit": "^7.1.5",
"ioredis": "^5.3.2",
"joi": "^14.3.1",
"joi": "^17.12.1",
"lodash": "^4.17.21",
"morgan": "^1.10.0",
"nconf": "^0.12.1",
"nconf-yaml": "^1.0.2",
"newrelic": "^11.6.0",
"newrelic": "^11.10.3",
"node-cache": "^5.1.2",
"prom-client": "^15.0.0",
"prom-client": "^15.1.0",
"rate-limit-redis": "^4.2.0",
"rate-limiter-flexible": "^3.0.4",
"rate-limiter-flexible": "^4.0.1",
"uuid": "^8.3.2",
"winston": "^3.11.0",
"winston-transport": "^4.6.0",
"winston-transport": "^4.7.0",
"yargs": "^17.7.2"
},
"devDependencies": {
"chai": "^4.3.10",
"eslint": "^8.54.0",
"chai": "^4.4.1",
"eslint": "^8.56.0",
"eslint-config-airbnb-base": "^15.0.0",
"eslint-plugin-import": "^2.29.0",
"mocha": "^10.2.0",
"nock": "^13.4.0",
"nodemon": "^3.0.1",
"eslint-plugin-import": "^2.29.1",
"mocha": "^10.3.0",
"nock": "^13.5.1",
"nodemon": "^3.0.3",
"nyc": "^15.1.0",
"sinon": "^17.0.1",
"supertest": "^6.3.3"
"supertest": "^6.3.4"
}
}
14 changes: 8 additions & 6 deletions src/helpers/validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,35 @@ const PUSH_SOURCE_CONTENT_META_SCHEMA = joi.object().keys({
* @returns {Error} Throws an error only if there is one passed
*/
function handleResult(error, isWeb) {
if (error !== null) {
const { details } = error;
if (error) {
const details = error.details
? error.details.map((i) => i.message).join(',')
: '';
const err = new Error();
if (isWeb) {
err.message = 'Invalid Payload';
err.details = details;
err.status = 422;
err.code = 'invalid';
} else {
err.message = details.map((i) => i.message).join(',');
err.message = details;
}
throw err;
}
}

function validatePushSourceContentRoot(payload) {
const { error } = joi.validate(payload, PUSH_SOURCE_CONTENT_ROOT_SCHEMA);
const { error } = PUSH_SOURCE_CONTENT_ROOT_SCHEMA.validate(payload);
handleResult(error, true);
}

function validatePushSourceContentData(payload) {
const { error } = joi.validate(payload, PUSH_SOURCE_CONTENT_DATA_SCHEMA);
const { error } = PUSH_SOURCE_CONTENT_DATA_SCHEMA.validate(payload);
handleResult(error, true);
}

function validatePushSourceContentMeta(payload) {
const { error } = joi.validate(payload, PUSH_SOURCE_CONTENT_META_SCHEMA);
const { error } = PUSH_SOURCE_CONTENT_META_SCHEMA.validate(payload);
handleResult(error, true);
}

Expand Down
8 changes: 4 additions & 4 deletions src/middlewares/headers.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ async function validateAuth(req, res, next) {
const authKey = `auth:${req.token.project_token}`;
const clientToken = md5(req.token.original);

let serverToken = await registry.get(authKey);
let serverToken = await registry.get(authKey, { local: true });
if (!serverToken && req.token.project_secret) {
const lockKey = `disable:auth:${clientToken}`;
if (!(await registry.get(lockKey))) {
if (!(await registry.get(lockKey, { local: true }))) {
if (await syncer.verifyCredentials({ token: req.token })) {
// update authentication registry
await registry.set(authKey, clientToken, authCacheSec);
await registry.set(authKey, clientToken, authCacheSec, { local: true });
serverToken = clientToken;
logger.info(`Validated credentials for project: ${req.token.project_token}`);
} else {
// lock credentials to throttle requests
await registry.set(lockKey, 1, authCacheSec);
await registry.set(lockKey, 1, authCacheSec, { local: true });
logger.warn(`Invalid auth credentials: ${req.token.original}`);
}
}
Expand Down
40 changes: 24 additions & 16 deletions src/services/registry/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ logger.info(`Registry strategy: ${config.get('settings:registry')}`);
* Delete a key from registry
*
* @param {String} key
* @param {Object} params (optional)
* @returns {Promise}
*/
function del(key) {
return registry.del(key);
function del(key, params) {
return registry.del(key, params);
}

/**
* Get registry value by key
*
* @param {String} key
* @param {Object} params (optional)
* @returns {Promise<value>}
*/
function get(key) {
return registry.get(key);
function get(key, params) {
return registry.get(key, params);
}

/**
Expand All @@ -30,19 +32,21 @@ function get(key) {
* @param {String} key
* @param {*} data
* @param {Number} expireSec (optional)
* @param {Object} params (optional)
* @returns {Promise}
*/
function set(key, data, expireSec) {
return registry.set(key, data, expireSec);
function set(key, data, expireSec, params) {
return registry.set(key, data, expireSec, params);
}

/**
* Find all keys in registry
*
* @param {Object} params (optional)
* @returns {Promise<Array[String]>}
*/
function findAll() {
return registry.findAll();
function findAll(params) {
return registry.findAll(params);
}

/**
Expand All @@ -51,10 +55,11 @@ function findAll() {
* @param {String} key
* @param {Number} increment
* @param {Number} expireSec (optional)
* @param {Object} params (optional)
* @returns {Promise}
*/
function incr(key, increment, expireSec) {
return registry.incr(key, increment, expireSec);
function incr(key, increment, expireSec, params) {
return registry.incr(key, increment, expireSec, params);
}

/**
Expand All @@ -63,31 +68,34 @@ function incr(key, increment, expireSec) {
* @param {String} key
* @param {*} value
* @param {Number} expireSec (optional)
* @param {Object} params (optional)
* @returns {Promise<Boolean>} - whether a new value was added
*/
function addToSet(key, value, expireSec) {
return registry.addToSet(key, value, expireSec);
function addToSet(key, value, expireSec, params) {
return registry.addToSet(key, value, expireSec, params);
}

/**
* Get all values from set
*
* @param {String} key
* @param {Object} params (optional)
* @returns {Promise<Array>}
*/
function listSet(key) {
return registry.listSet(key);
function listSet(key, params) {
return registry.listSet(key, params);
}

/**
* Get remaining TTL for key.
* Return 0 if key does not exist
*
* @param {String} key
* @param {Object} params (optional)
* @returns {Promise<Number>}
*/
function getTTLSec(key) {
return registry.getTTLSec(key);
function getTTLSec(key, params) {
return registry.getTTLSec(key, params);
}

/**
Expand Down
61 changes: 37 additions & 24 deletions src/services/registry/strategies/dynamodb-redis/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ const SYNC_KEYS = {};
// auto refresh lookups in redis
function autoSync() {
let SYNC_PROCESSING = false;
setInterval(() => {

setInterval(async () => {
if (SYNC_PROCESSING) return;

const keys = _.keys(SYNC_KEYS);
Expand All @@ -17,28 +18,28 @@ function autoSync() {
logger.info(`DynamoDB-Redis: Syncing ${keys.length} keys`);

SYNC_PROCESSING = true;
Promise.all(_.map(keys, (key) => (async () => {
const syncValue = SYNC_KEYS[key];
const value = await dynamodb.get(key);
// remove from syncer
delete SYNC_KEYS[key];
// key no longer exists, delete it from redis
if (value === undefined) {
await redis.del(key);
return;
}
// value has not been modified, abort
if (JSON.stringify(value) === syncValue) {
return;

// Do this serially to avoid throttling errors from DynamoDB
for (let i = 0; i < keys.length; i += 1) {
try {
const key = keys[i];
const syncValue = SYNC_KEYS[key];
const value = await dynamodb.get(key);
// remove from syncer
delete SYNC_KEYS[key];
// key no longer exists, delete it from redis
if (value === undefined) {
await redis.del(key);
} else if (JSON.stringify(value) !== syncValue) {
const expireSec = await dynamodb.getTTLSec(key);
await redis.set(key, value, expireSec > 0 ? expireSec : undefined);
}
} catch (err) {
logger.error(err);
}
const expireSec = await dynamodb.getTTLSec(key);
await redis.set(key, value, expireSec > 0 ? expireSec : undefined);
})())).then(() => {
SYNC_PROCESSING = false;
}).catch((err) => {
logger.error(err);
SYNC_PROCESSING = false;
});
}

SYNC_PROCESSING = false;
}, 2000);
}

Expand All @@ -55,9 +56,15 @@ async function del(key) {
/**
* @implements {get}
*/
async function get(key) {
async function get(key, params) {
// Found in Redis... done
let value = await redis.get(key);

// Special flag to not propagate to DynamoDB
if (params && params.local === true) {
return value;
}

if (value !== undefined) {
SYNC_KEYS[key] = JSON.stringify(value);
return value;
Expand All @@ -75,7 +82,13 @@ async function get(key) {
/**
* @implements {set}
*/
async function set(key, data, expireSec) {
async function set(key, data, expireSec, params) {
// Special flag to not propagate to DynamoDB
if (params && params.local === true) {
await redis.set(key, data, expireSec);
return;
}

await Promise.all([
redis.set(key, data, expireSec),
dynamodb.set(key, data, expireSec),
Expand Down
2 changes: 1 addition & 1 deletion tests/routes/content.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ describe('POST /content', () => {
},
errors: [{
code: 'invalid',
detail: '[{"message":"\\"data\\" is required","path":["data"],"type":"any.required","context":{"key":"data","label":"data"}}]',
detail: '"\\"data\\" is required"',
status: '422',
title: 'Invalid Payload',
source: {},
Expand Down

0 comments on commit c12596d

Please sign in to comment.