From 4e1de90cda0643ba91f3c6d76b1b92b7003203a3 Mon Sep 17 00:00:00 2001 From: Sanders DeNardi Date: Wed, 20 Dec 2017 22:57:55 -0500 Subject: [PATCH 1/6] add initial sql schema & scripts, libs, cutover feeds --- database/dump_database.sh | 34 ++++++++++++++++++++++++++ database/make_fresh_deploy.sh | 24 +++++++++++++++++++ database/once_tweet/Feeds.sql | 12 ++++++++++ database/once_tweet/Meta.sql | 8 +++++++ package.json | 8 +++++++ src/Feed.js | 1 + src/Feeds.js | 45 +++++++++++++++++++---------------- src/db.js | 36 ++++++++++++++++++++++++++++ 8 files changed, 147 insertions(+), 21 deletions(-) create mode 100755 database/dump_database.sh create mode 100755 database/make_fresh_deploy.sh create mode 100644 database/once_tweet/Feeds.sql create mode 100644 database/once_tweet/Meta.sql create mode 100644 src/db.js diff --git a/database/dump_database.sh b/database/dump_database.sh new file mode 100755 index 0000000..4cd1a7d --- /dev/null +++ b/database/dump_database.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +if [ $# -lt 3 ]; +then + echo "Usage: $(basename $0) []" && exit 1 +fi + +if [ $# -eq 3 ]; +then + echo -n "DB password: " + read -s DB_pass +else + DB_pass=$4 +fi + +DB_host=$1 +DB_user=$2 +DB=$3 + +test -d $DB || mkdir -p $DB + +echo +echo "Dumping tables into separate SQL command files for database '$DB'" + +tbl_count=0 + +for t in $(mysql -NBA -h $DB_host -u $DB_user -p$DB_pass -D $DB -e 'show tables') +do + echo "DUMPING TABLE: $t" + mysqldump --compact --no-data -h $DB_host -u $DB_user -p$DB_pass $DB $t | sed 's/ AUTO_INCREMENT=[0-9]*//g' | sed -e 's/DEFINER[ ]*=[ ]*[^*]*\*/\*/' > $DB/$t.sql + (( tbl_count++ )) +done + +echo "$tbl_count tables dumped from database '$DB'" diff --git a/database/make_fresh_deploy.sh b/database/make_fresh_deploy.sh new file mode 100755 index 0000000..5919a54 --- /dev/null +++ b/database/make_fresh_deploy.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +[ $# -lt 1 ] && echo "Usage: $(basename $0) " && exit 1 + +DB=$1 + +echo "Creating initial dump script for '$DB'" + +cat <(echo "SET FOREIGN_KEY_CHECKS=0;") > fresh_deploy_$DB.sql + +cat <(echo "CREATE DATABASE $DB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;;") >> fresh_deploy_$DB.sql + +cat <(echo "USE $DB;") >> fresh_deploy_$DB.sql + +FILES=$DB/*.sql +for f in $FILES +do + if [ "$f" == "$DB/RefData.sql" ] + then + continue; + fi + echo "Processing $f..." + cat $f >> fresh_deploy_$DB.sql +done diff --git a/database/once_tweet/Feeds.sql b/database/once_tweet/Feeds.sql new file mode 100644 index 0000000..80160c0 --- /dev/null +++ b/database/once_tweet/Feeds.sql @@ -0,0 +1,12 @@ +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `Feeds` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `Name` text COLLATE utf8mb4_unicode_ci NOT NULL, + `Handle` text COLLATE utf8mb4_unicode_ci NOT NULL, + `ScreenNames` text COLLATE utf8mb4_unicode_ci NOT NULL, + `Key` text COLLATE utf8mb4_unicode_ci NOT NULL, + `Secret` text COLLATE utf8mb4_unicode_ci NOT NULL, + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +/*!40101 SET character_set_client = @saved_cs_client */; diff --git a/database/once_tweet/Meta.sql b/database/once_tweet/Meta.sql new file mode 100644 index 0000000..733a2da --- /dev/null +++ b/database/once_tweet/Meta.sql @@ -0,0 +1,8 @@ +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `Meta` ( + `Name` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL, + `Value` text COLLATE utf8mb4_unicode_ci NOT NULL, + PRIMARY KEY (`Name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +/*!40101 SET character_set_client = @saved_cs_client */; diff --git a/package.json b/package.json index 36c10b9..22351b0 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,20 @@ { "name": "once-tweet", "main": "index.js", + "private": true, + "repository": { + "type": "git", + "url": "git+https://github.com/sedenardi/once-tweet.git" + }, + "author": "Sanders DeNardi", + "homepage": "https://github.com/sedenardi/once-tweet#readme", "dependencies": { "aws-sdk": "^2.7.27", "bignumber.js": "^4.0.0", "cheerio": "^0.22.0", "lodash": "^4.17.4", "moment": "^2.17.1", + "mysql2": "^1.5.1", "request": "^2.79.0", "twitter": "^1.7.0" }, diff --git a/src/Feed.js b/src/Feed.js index 73374d8..ff108f7 100644 --- a/src/Feed.js +++ b/src/Feed.js @@ -54,6 +54,7 @@ class Feed { } Feed.parse = function(feed, meta) { feed.Since = _.find(meta, { Name: 'since_id' }).Value; + feed.ScreenNames = feed.ScreenNames.split(','); return new Feed(feed); }; diff --git a/src/Feeds.js b/src/Feeds.js index 756fe01..338a530 100644 --- a/src/Feeds.js +++ b/src/Feeds.js @@ -4,29 +4,32 @@ const moment = require('moment'); const _ = require('lodash'); const BigNumber = require('bignumber.js'); const Feed = require('./Feed'); -const dynamo = require('./dynamo')(); +const db = require('./db')(); + +const feedsQuery = () => { return db.query('select * from once_tweet.Feeds;'); }; +const metaQuery = () => { return db.query('select * from once_tweet.Meta;'); }; class Feeds { constructor(feeds) { this.Feeds = feeds.Feeds; } save(maxes) { - const compacted = _(maxes) - .compact() - .map((s) => { return s.toString(); }) - .value(); - if (compacted.length) { - const max = BigNumber.max(compacted); - return dynamo.put({ - TableName: 'once_Meta', - Item: { - Name: 'since_id', - Value: max.toString() - } - }); - } else { - return Promise.resolve(); - } + // const compacted = _(maxes) + // .compact() + // .map((s) => { return s.toString(); }) + // .value(); + // if (compacted.length) { + // const max = BigNumber.max(compacted); + // return dynamo.put({ + // TableName: 'once_Meta', + // Item: { + // Name: 'since_id', + // Value: max.toString() + // } + // }); + // } else { + // return Promise.resolve(); + // } } run() { const runs = this.Feeds.map((f) => { return f.run(); }); @@ -37,11 +40,11 @@ class Feeds { } Feeds.get = function() { return Promise.all([ - dynamo.scan({ TableName: 'once_Feeds' }), - dynamo.scan({ TableName: 'once_Meta' }) + feedsQuery(), + metaQuery() ]).then((res) => { - const feeds = res[0].Items.map((v) => { - return Feed.parse(v, res[1].Items); + const feeds = res[0].map((v) => { + return Feed.parse(v, res[1]); }); return Promise.resolve(new Feeds({ Feeds: feeds })); }); diff --git a/src/db.js b/src/db.js new file mode 100644 index 0000000..cc8a48c --- /dev/null +++ b/src/db.js @@ -0,0 +1,36 @@ +'use strict'; + +const config = require('../config'); +const mysql = require('mysql2'); + +module.exports = function() { + const pool = mysql.createPool(config.mysql); + return { + query: function(sql, params) { + return new Promise((resolve, reject) => { + if (typeof sql !== 'string') { + return reject(new Error('Missing query string')); + } + if (params !== undefined && params.constructor !== Array) { + return reject(new Error('Params must be an array')); + } + pool.getConnection((err, conn) => { + if (err) { return reject(err); } + conn.query(sql, params, (error, res) => { + conn.release(); + if (error) { return reject(error); } + return resolve(res); + }); + }); + }); + }, + end: function() { + return new Promise((resolve, reject) => { + pool.end((err) => { + if (err) { return reject(err); } + return resolve(); + }); + }); + } + }; +}; From 5f5cad6d9c5caff839b8fa66e3175196fe2e7192 Mon Sep 17 00:00:00 2001 From: Sanders DeNardi Date: Thu, 21 Dec 2017 10:55:30 -0500 Subject: [PATCH 2/6] cutover items to sql, pass along DB to close at end --- database/once_tweet/Items.sql | 14 +++++++++++ index.js | 7 +++--- src/Feed.js | 8 +++--- src/FeedItem.js | 36 +++++++++++++++------------ src/Feeds.js | 46 +++++++++++++++++------------------ src/twitter.js | 2 +- 6 files changed, 66 insertions(+), 47 deletions(-) create mode 100644 database/once_tweet/Items.sql diff --git a/database/once_tweet/Items.sql b/database/once_tweet/Items.sql new file mode 100644 index 0000000..95d72fa --- /dev/null +++ b/database/once_tweet/Items.sql @@ -0,0 +1,14 @@ +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `Items` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `Hash` char(40) COLLATE utf8mb4_unicode_ci NOT NULL, + `ScreenName` varchar(15) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `id_str` varchar(24) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `Url` text COLLATE utf8mb4_unicode_ci, + `created_at` int(10) unsigned NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `Hash` (`Hash`), + KEY `created_at` (`created_at`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +/*!40101 SET character_set_client = @saved_cs_client */; diff --git a/index.js b/index.js index 5782fe5..7b5ebe2 100644 --- a/index.js +++ b/index.js @@ -1,12 +1,13 @@ 'use strict'; const Feeds = require('./src/Feeds'); +const db = require('./src/db')(); const run = function() { - return Feeds.get().then((feeds) => { + return Feeds.get(db).then((feeds) => { return feeds.run(); - }).catch((err) => { - console.log(err); + }).then(() => { + return db.end(); }); }; diff --git a/src/Feed.js b/src/Feed.js index ff108f7..14374c5 100644 --- a/src/Feed.js +++ b/src/Feed.js @@ -16,6 +16,7 @@ class Feed { this.Secret = feed.Secret; this.Since = feed.Since; + this.db = feed.db; } run() { const twit = twitter(this); @@ -25,7 +26,7 @@ class Feed { return Promise.all(fetches).then((res) => { const itemActions = _(res) .flatten() - .map((t) => { return FeedItem.parse(t); }) + .map((t) => { return FeedItem.parse(t, this.db); }) .value(); return Promise.all(itemActions); }).then((res) => { @@ -37,7 +38,7 @@ class Feed { console.log(`${items.length} items in ${this.Name}`); const seq = items.reduce((r, v, i) => { const last = i === (items.length - 1); - r = r.then(() => { return v.run(twit, last); }); + r = r.then(() => { return v.run(/*twit*/null, last); }); return r; }, Promise.resolve()); return seq.then(() => { @@ -52,9 +53,10 @@ class Feed { }); } } -Feed.parse = function(feed, meta) { +Feed.parse = function(feed, meta, db) { feed.Since = _.find(meta, { Name: 'since_id' }).Value; feed.ScreenNames = feed.ScreenNames.split(','); + feed.db = db; return new Feed(feed); }; diff --git a/src/FeedItem.js b/src/FeedItem.js index d1a44ee..c367671 100644 --- a/src/FeedItem.js +++ b/src/FeedItem.js @@ -3,30 +3,31 @@ const moment = require('moment'); const _ = require('lodash'); const req = require('./req'); -const dynamo = require('./dynamo')(); +const crypto = require('crypto'); class FeedItem { constructor(item) { this.id_str = item.id_str; + this.handle = item.handle; this.Url = item.Url; + this.Hash = item.Hash; this.created_at = item.created_at; + this.db = item.db; } save() { - return dynamo.put({ - TableName: 'once_Items', - Item: { - Url: this.Url, - created_at: this.created_at - } - }); + const sql = 'insert into once_tweet.Items(`Hash`,ScreenName,id_str,Url,created_at) value(?,?,?,?,?);'; + return this.db.query(sql, [ + this.Hash, + this.handle, + this.id_str, + this.Url, + this.created_at.toString().slice(0, -3) + ]); } run(twit, last) { - return dynamo.get({ - TableName: 'once_Items', - Key: { Url: this.Url }, - ConsistentRead: true - }).then((res) => { - if (res.Item) { + const sql = 'select * from once_tweet.Items where `Hash` = ?;'; + return this.db.query(sql, [this.Hash]).then((res) => { + if (res[0]) { return Promise.resolve(); } return this.save().then(() => { @@ -37,10 +38,12 @@ class FeedItem { }); } } -FeedItem.parse = function(rawItem) { +FeedItem.parse = function(rawItem, db) { const item = { id_str: rawItem.id_str, - created_at: moment(rawItem.created_at, 'dd MMM DD HH:mm:ss ZZ YYYY').valueOf() + handle: rawItem.user.screen_name, + created_at: moment(rawItem.created_at, 'dd MMM DD HH:mm:ss ZZ YYYY').valueOf(), + db: db }; if (rawItem.entities.urls.length) { @@ -53,6 +56,7 @@ FeedItem.parse = function(rawItem) { if (!url) { return Promise.resolve(null); } return req.head(url).then((resolvedUrl) => { item.Url = resolvedUrl.split(/[?#]/)[0].toLowerCase(); + item.Hash = crypto.createHash('sha1').update(item.Url).digest('hex'); return Promise.resolve(new FeedItem(item)); }); }); diff --git a/src/Feeds.js b/src/Feeds.js index 338a530..c719eb5 100644 --- a/src/Feeds.js +++ b/src/Feeds.js @@ -4,32 +4,30 @@ const moment = require('moment'); const _ = require('lodash'); const BigNumber = require('bignumber.js'); const Feed = require('./Feed'); -const db = require('./db')(); -const feedsQuery = () => { return db.query('select * from once_tweet.Feeds;'); }; -const metaQuery = () => { return db.query('select * from once_tweet.Meta;'); }; +const feedsQuery = (db) => { return db.query('select * from once_tweet.Feeds;'); }; +const metaQuery = (db) => { return db.query('select * from once_tweet.Meta;'); }; +const metaUpdate = (db, max) => { + const sql = 'update once_tweet.Meta set `Value` = ? where `Name` = \'since_id\''; + return db.query(sql, [max]); +}; class Feeds { constructor(feeds) { + this.db = feeds.db; this.Feeds = feeds.Feeds; } save(maxes) { - // const compacted = _(maxes) - // .compact() - // .map((s) => { return s.toString(); }) - // .value(); - // if (compacted.length) { - // const max = BigNumber.max(compacted); - // return dynamo.put({ - // TableName: 'once_Meta', - // Item: { - // Name: 'since_id', - // Value: max.toString() - // } - // }); - // } else { - // return Promise.resolve(); - // } + const compacted = _(maxes) + .compact() + .map((s) => { return s.toString(); }) + .value(); + if (compacted.length) { + const max = BigNumber.max(compacted); + return metaUpdate(this.db, max.toString()); + } else { + return Promise.resolve(); + } } run() { const runs = this.Feeds.map((f) => { return f.run(); }); @@ -38,15 +36,15 @@ class Feeds { }); } } -Feeds.get = function() { +Feeds.get = function(db) { return Promise.all([ - feedsQuery(), - metaQuery() + feedsQuery(db), + metaQuery(db) ]).then((res) => { const feeds = res[0].map((v) => { - return Feed.parse(v, res[1]); + return Feed.parse(v, res[1], db); }); - return Promise.resolve(new Feeds({ Feeds: feeds })); + return Promise.resolve(new Feeds({ db: db, Feeds: feeds })); }); }; diff --git a/src/twitter.js b/src/twitter.js index a695547..ec661f5 100644 --- a/src/twitter.js +++ b/src/twitter.js @@ -30,7 +30,7 @@ module.exports = function(feed) { const retweet = function(id, last) { return client.post(`statuses/retweet/${id}`, {}).then(() => { - console.log(`Tweeting from ${feed.Handle}`); + console.log(`Tweeting ${id} from ${feed.Handle}`); return !last ? sleep() : Promise.resolve(); }); }; From 12b607b88e97d2ee4fee99fe018f3c89c30e274d Mon Sep 17 00:00:00 2001 From: Sanders DeNardi Date: Thu, 21 Dec 2017 10:56:47 -0500 Subject: [PATCH 3/6] remove dynamo and aws-sdk --- package.json | 1 - src/dynamo.js | 51 --------------------------------------------------- 2 files changed, 52 deletions(-) delete mode 100644 src/dynamo.js diff --git a/package.json b/package.json index 22351b0..de75353 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,6 @@ "author": "Sanders DeNardi", "homepage": "https://github.com/sedenardi/once-tweet#readme", "dependencies": { - "aws-sdk": "^2.7.27", "bignumber.js": "^4.0.0", "cheerio": "^0.22.0", "lodash": "^4.17.4", diff --git a/src/dynamo.js b/src/dynamo.js deleted file mode 100644 index f0a50ff..0000000 --- a/src/dynamo.js +++ /dev/null @@ -1,51 +0,0 @@ -'use strict'; - -const config = require('../config'); - -module.exports = function() { - const AWS = config.AWS(); - const dynamodb = new AWS.DynamoDB(); - const docClient = new AWS.DynamoDB.DocumentClient(); - return { - get: function(query) { - return new Promise((resolve, reject) => { - docClient.get(query, (err, res) => { - if (err) { return reject(err); } - return resolve(res); - }); - }); - }, - getItem: function(query) { - return new Promise((resolve, reject) => { - dynamodb.getItem(query, (err, res) => { - if (err) { return reject(err); } - return resolve(res); - }); - }); - }, - scan: function(query) { - return new Promise((resolve, reject) => { - docClient.scan(query, (err, res) => { - if (err) { return reject(err); } - return resolve(res); - }); - }); - }, - put: function(params) { - return new Promise((resolve, reject) => { - docClient.put(params, (err, res) => { - if (err) { return reject(err); } - return resolve(res); - }); - }); - }, - batchWrite: function(params) { - return new Promise((resolve, reject) => { - docClient.batchWrite(params, (err, res) => { - if (err) { return reject(err); } - return resolve(res); - }); - }); - } - }; -}; From ffe45d8739db1e2b5d8903023f6f53886cbfd533 Mon Sep 17 00:00:00 2001 From: Sanders DeNardi Date: Thu, 21 Dec 2017 11:08:57 -0500 Subject: [PATCH 4/6] add cleanup step --- index.js | 4 +++- src/Feeds.js | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 7b5ebe2..2c88443 100644 --- a/index.js +++ b/index.js @@ -5,7 +5,9 @@ const db = require('./src/db')(); const run = function() { return Feeds.get(db).then((feeds) => { - return feeds.run(); + return feeds.run().then(() => { + return feeds.cleanup(); + }); }).then(() => { return db.end(); }); diff --git a/src/Feeds.js b/src/Feeds.js index c719eb5..a40039b 100644 --- a/src/Feeds.js +++ b/src/Feeds.js @@ -35,6 +35,11 @@ class Feeds { return this.save(maxes); }); } + cleanup() { + const threshold = moment().subtract(3, 'months').unix(); + const sql = 'delete from once_tweet.Items where created_at < ?'; + return db.query(sql, [threshold]); + } } Feeds.get = function(db) { return Promise.all([ From 311264d36597af093a9a4779e1684fe3e177ebd1 Mon Sep 17 00:00:00 2001 From: Sanders DeNardi Date: Thu, 21 Dec 2017 11:17:26 -0500 Subject: [PATCH 5/6] fix cleanup query, enable twitter --- src/Feed.js | 2 +- src/Feeds.js | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/Feed.js b/src/Feed.js index 14374c5..f00e8a9 100644 --- a/src/Feed.js +++ b/src/Feed.js @@ -38,7 +38,7 @@ class Feed { console.log(`${items.length} items in ${this.Name}`); const seq = items.reduce((r, v, i) => { const last = i === (items.length - 1); - r = r.then(() => { return v.run(/*twit*/null, last); }); + r = r.then(() => { return v.run(twit, last); }); return r; }, Promise.resolve()); return seq.then(() => { diff --git a/src/Feeds.js b/src/Feeds.js index a40039b..b19a0e4 100644 --- a/src/Feeds.js +++ b/src/Feeds.js @@ -38,7 +38,10 @@ class Feeds { cleanup() { const threshold = moment().subtract(3, 'months').unix(); const sql = 'delete from once_tweet.Items where created_at < ?'; - return db.query(sql, [threshold]); + return this.db.query(sql, [threshold]).then((res) => { + console.log(`${res.affectedRows} rows cleaned up.`); + return Promise.resolve(); + }); } } Feeds.get = function(db) { From 48a0e53ea0445ca50bc8b281c7806ecc2cdde959 Mon Sep 17 00:00:00 2001 From: Sanders DeNardi Date: Thu, 21 Dec 2017 12:29:07 -0500 Subject: [PATCH 6/6] move DB inside handler (for proper teardown) --- index.js | 6 +++--- src/db.js | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index 2c88443..576d4b1 100644 --- a/index.js +++ b/index.js @@ -1,9 +1,9 @@ 'use strict'; const Feeds = require('./src/Feeds'); -const db = require('./src/db')(); const run = function() { + const db = require('./src/db')(); return Feeds.get(db).then((feeds) => { return feeds.run().then(() => { return feeds.cleanup(); @@ -16,9 +16,9 @@ const run = function() { module.exports = { handler: (event, context, callback) => { run().then(() => { - callback(); + context.done(); }).catch((err) => { - callback(err); + context.done(err); }); } }; diff --git a/src/db.js b/src/db.js index cc8a48c..dfc8cd4 100644 --- a/src/db.js +++ b/src/db.js @@ -28,6 +28,7 @@ module.exports = function() { return new Promise((resolve, reject) => { pool.end((err) => { if (err) { return reject(err); } + console.log('Closing database.'); return resolve(); }); });