diff --git a/batchOperations.js b/batchOperations.js index 33f81a8..4aa188e 100644 --- a/batchOperations.js +++ b/batchOperations.js @@ -10,12 +10,12 @@ var log_level = process.env['LOG_LEVEL'] || 'info'; const winston = require('winston'); const logger = winston.createLogger({ - level: debug === true ? 'debug' : log_level, - transports: [ - new winston.transports.Console({ - format: winston.format.simple() - }) - ] + level: debug === true ? 'debug' : log_level, + transports: [ + new winston.transports.Console({ + format: winston.format.simple() + }) + ] }); /** @@ -89,6 +89,138 @@ function getBatch(setRegion, s3Prefix, batchId, callback) { exports.getBatch = getBatch; +function cleanBatches(setRegion, s3Prefix, callback) { + init(setRegion); + cleanBatchesSegments(setRegion, s3Prefix, null, callback); +} + +function cleanBatchesSegments(setRegion, s3Prefix, lastEvaluatedKey, callback) { + // query for batches based on given s3Prefix + queryBatchByPrefix(setRegion, s3Prefix, lastEvaluatedKey, function (err, data) { + if (err) { + callback(err); + } else { + async.map(data.items, function (batchItem, asyncCallback) { + //clean found batches one by one + cleanBatch(setRegion, batchItem, function (err, data) { + if (err) { + asyncCallback(err); + } else { + asyncCallback(null, data); + } + }); + }, function (err, results) { + if (err) { + callback(err); + } else { + if (data.lastEvaluatedKey) { + return cleanBatchesSegments(setRegion, s3Prefix, data.lastEvaluatedKey, callback); + } else { + // deletions are completed + callback(null, { + batchCountDeleted: results.length, + batchesDeleted: results + }); + } + } + }); + } + }); +} + +function cleanBatch(setRegion, batchItem, callback) { + // delete batch entry + deleteBatch(batchItem.s3Prefix, batchItem.batchId, function (err, data) { + if (err) { + callback(err); + } else { + if ( !batchItem.entries || batchItem.entries.length <= 0) { + callback(null, data); + } else { + //delete related entries in filesTable + async.map(batchItem.entries, function (processedFile, asyncCallback) { + common.deleteFile(dynamoDB, setRegion, processedFile, function (err, data) { + if (err) { + asyncCallback(err); + } else { + asyncCallback(null, data); + } + }); + }, function (err, results) { + if (err) { + callback(err); + } else { + data.processedFilesCountDeleted = results.length; + data.processedFilesDeleted = results; + callback(null, data); + } + }); + } + } + }); +} + +function queryBatchByPrefix(setRegion, s3Prefix, lastEvaluatedKey, callback) { + init(setRegion); + var keyConditionExpression = null; + var keyConditionNames = null; + var keyConditionValues = null; + + queryParams = { + TableName: batchTable + }; + if (lastEvaluatedKey) { + queryParams.ExclusiveStartKey = lastEvaluatedKey; + } + + keyConditionExpression = "#s3Prefix = :s3Prefix"; + // add s3Prefix + keyConditionNames = { + "#s3Prefix": "s3Prefix" + }; + keyConditionValues = { + ":s3Prefix": { + "S": "" + s3Prefix + } + }; + + queryParams.KeyConditionExpression = keyConditionExpression; + queryParams.ExpressionAttributeNames = keyConditionNames; + queryParams.ExpressionAttributeValues = keyConditionValues; + + if (debug == true) { + console.log(queryParams); + } + + dynamoDB.query(queryParams, function (err, data) { + if (err) { + callback(err); + } else { + if (data && data.Items) { + var itemsToShow = []; + + data.Items.map(function (item) { + toShow = { + s3Prefix: item.s3Prefix.S, + batchId: item.batchId.S, + status: item.status.S, + entries: item.entries.SS, + lastUpdateDate: common.readableTime(item.lastUpdate.N), + lastUpdate: item.lastUpdate.N + }; + itemsToShow.push(toShow); + }); + + callback(null, {items: itemsToShow, lastEvaluatedKey: data.LastEvaluatedKey}); + } else { + callback(null, []); + } + } + }); +} + +exports.cleanBatches = cleanBatches; + /** * Function which performs a batch query with the provided arguments * @@ -408,4 +540,4 @@ function updateBatchStatus(s3Prefix, thisBatchId, status, requireStatusArray, up }); }; -exports.updateBatchStatus = updateBatchStatus; \ No newline at end of file +exports.updateBatchStatus = updateBatchStatus; diff --git a/cleanBatches.js b/cleanBatches.js new file mode 100644 index 0000000..d1c56b4 --- /dev/null +++ b/cleanBatches.js @@ -0,0 +1,18 @@ +var batchOperations = require("./batchOperations"); + +var args = require('minimist')(process.argv.slice(2)); + +var setRegion = args.region; +var s3Prefix = args.s3Prefix; + +batchOperations.cleanBatches(setRegion, s3Prefix, function (err, data) { + if (err) { + console.log("Error: " + err); + process.exit(-1); + } else { + console.log("OK: Deletion of " + data.batchCountDeleted + " Batches"); + console.log("Deleted Batch Information:"); + console.log(JSON.stringify(data)); + + } +}) diff --git a/common.js b/common.js index 30f9832..a4e02a7 100644 --- a/common.js +++ b/common.js @@ -97,10 +97,7 @@ function createTables(dynamoDB, callback) { KeyType: 'HASH' }], TableName: filesTable, - ProvisionedThroughput: { - ReadCapacityUnits: 1, - WriteCapacityUnits: 5 - } + BillingMode: "PAY_PER_REQUEST" }; var configKey = s3prefix; var configSpec = { @@ -113,10 +110,7 @@ function createTables(dynamoDB, callback) { KeyType: 'HASH' }], TableName: configTable, - ProvisionedThroughput: { - ReadCapacityUnits: 1, - WriteCapacityUnits: 5 - } + BillingMode: "PAY_PER_REQUEST" }; var batchKey = batchId; @@ -143,10 +137,7 @@ function createTables(dynamoDB, callback) { KeyType: 'RANGE' }], TableName: batchTable, - ProvisionedThroughput: { - ReadCapacityUnits: 1, - WriteCapacityUnits: 5 - }, + BillingMode: "PAY_PER_REQUEST", GlobalSecondaryIndexes: [{ IndexName: batchStatusGSI, KeySchema: [{ @@ -158,10 +149,6 @@ function createTables(dynamoDB, callback) { }], Projection: { ProjectionType: 'ALL' - }, - ProvisionedThroughput: { - ReadCapacityUnits: 1, - WriteCapacityUnits: 5 } }] }; @@ -459,65 +446,71 @@ function getS3Arn(bucket, prefix) { exports.getS3Arn = getS3Arn; + function ensureS3InvokePermisssions(lambda, bucket, prefix, functionName, functionArn, callback) { - lambda.getPolicy({ - FunctionName: functionName - }, function (err, data) { - if (err && err.code !== 'ResourceNotFoundException') { - callback(err); - } + var skipCheck = process.env.SKIP_LAMBDA_BUCKET_PERMISSION_CHECK ? Boolean(process.env.SKIP_LAMBDA_BUCKET_PERMISSION_CHECK) : false; + if ( skipCheck ) { + callback(); + } else { + lambda.getPolicy({ + FunctionName: functionName + }, function (err, data) { + if (err && err.code !== 'ResourceNotFoundException') { + callback(err); + } - var foundMatch = false; - var s3Arn = getS3Arn(bucket); - var sourceAccount = functionArn.split(":")[4]; - - // process the existing permissions policy if there is one - if (data && data.Policy) { - var statements = JSON.parse(data.Policy).Statement; - - statements.map(function (item) { - try { - // check that the source s3 bucket has rights to invoke the function in the correct source account and for the correct bucket - if (item.Principal === "s3.amazonaws.com" && - item.Action === "lambda.InvokeFunction" && - item.Resource === functionArn && - item.Condition.StringEquals['AWS:SourceAccount'] === sourceAccount && - item.Condition.ArnLike['AWS:SourceArn'] === s3Arn) { - foundMatch = true; - } - } catch (e) { - // this is OK - just means that the policy structure doesn't - // match the above format + var foundMatch = false; + var s3Arn = getS3Arn(bucket); + var sourceAccount = functionArn.split(":")[4]; + + // process the existing permissions policy if there is one + if (data && data.Policy) { + var statements = JSON.parse(data.Policy).Statement; + + statements.map(function (item) { + try { + // check that the source s3 bucket has rights to invoke the function in the correct source account and for the correct bucket + if (item.Principal === "s3.amazonaws.com" && + item.Action === "lambda.InvokeFunction" && + item.Resource === functionArn && + item.Condition.StringEquals['AWS:SourceAccount'] === sourceAccount && + item.Condition.ArnLike['AWS:SourceArn'] === s3Arn) { + foundMatch = true; + } + } catch (e) { + // this is OK - just means that the policy structure doesn't + // match the above format - } - }); - } + } + }); + } - if (foundMatch === true) { - logger.info("Found existing Policy match for S3 path to invoke " + functionName); - callback(); - } else { - var lambdaPermissions = { - Action: "lambda:InvokeFunction", - FunctionName: functionName, - Principal: "s3.amazonaws.com", - // only use internal account sources - SourceAccount: sourceAccount, - SourceArn: s3Arn, - StatementId: uuid.v4() - }; + if (foundMatch === true) { + logger.info("Found existing Policy match for S3 path to invoke " + functionName); + callback(); + } else { + var lambdaPermissions = { + Action: "lambda:InvokeFunction", + FunctionName: functionName, + Principal: "s3.amazonaws.com", + // only use internal account sources + SourceAccount: sourceAccount, + SourceArn: s3Arn, + StatementId: uuid.v4() + }; - lambda.addPermission(lambdaPermissions, function (err, data) { - if (err) { - logger.error(err); - callback(err); - } else { - logger.info("Granted S3 permission to invoke " + functionArn); - callback(); - } - }); - } - }); + lambda.addPermission(lambdaPermissions, function (err, data) { + if (err) { + logger.error(err); + callback(err); + } else { + logger.info("Granted S3 permission to invoke " + functionArn); + callback(); + } + }); + } + }); + } } exports.ensureS3InvokePermisssions = ensureS3InvokePermisssions; @@ -870,4 +863,4 @@ function reprocessFile(dynamoDB, s3, region, file, callback) { }); } -exports.reprocessFile = reprocessFile; \ No newline at end of file +exports.reprocessFile = reprocessFile; diff --git a/dist/AWSLambdaRedshiftLoader-2.7.8.zip b/dist/AWSLambdaRedshiftLoader-2.7.8.zip index 5f0c9b9..24c00ae 100644 Binary files a/dist/AWSLambdaRedshiftLoader-2.7.8.zip and b/dist/AWSLambdaRedshiftLoader-2.7.8.zip differ diff --git a/kmsCrypto.js b/kmsCrypto.js index cd7af96..ccddaa7 100644 --- a/kmsCrypto.js +++ b/kmsCrypto.js @@ -27,7 +27,7 @@ var authContext = { }; // module key alias to be used for this application -var moduleKeyName = "alias/LambdaRedshiftLoaderKey"; +var moduleKeyName = process.env.S3_REDSHIFT_LOADER_KMS_KEY_NAME || "alias/LambdaRedshiftLoaderKey"; function setRegion(region) { if (!region) {