diff --git a/.env.example b/.env.example index 7db4f7fb..14ecb3aa 100644 --- a/.env.example +++ b/.env.example @@ -3,6 +3,7 @@ NETWORK=testnet MAINTENANCE= HUB_DATABASE_URL=mysql://... SEQ_DATABASE_URL=mysql://... +ENVELOP_DATABASE_URL=mysql:// RELAYER_PK=0x123... DEFAULT_NETWORK=1 SHUTTER_URL=https://... diff --git a/src/helpers/mysql.ts b/src/helpers/mysql.ts index a4f2a1c5..5c0f81f2 100644 --- a/src/helpers/mysql.ts +++ b/src/helpers/mysql.ts @@ -33,7 +33,21 @@ sequencerConfig.connectTimeout = 60e3; sequencerConfig.acquireTimeout = 60e3; sequencerConfig.timeout = 60e3; sequencerConfig.charset = 'utf8mb4'; -bluebird.promisifyAll([Pool, Connection]); const sequencerDB = mysql.createPool(sequencerConfig); -export { hubDB as default, sequencerDB }; +// @ts-ignore +const envelopConfig = parse(process.env.ENVELOP_DATABASE_URL); +envelopConfig.connectionLimit = connectionLimit; +envelopConfig.multipleStatements = true; +envelopConfig.database = envelopConfig.path[0]; +envelopConfig.host = envelopConfig.hosts[0].name; +envelopConfig.port = envelopConfig.hosts[0].port; +envelopConfig.connectTimeout = 60e3; +envelopConfig.acquireTimeout = 60e3; +envelopConfig.timeout = 60e3; +envelopConfig.charset = 'utf8mb4'; +const envelopDB = mysql.createPool(envelopConfig); + +bluebird.promisifyAll([Pool, Connection]); + +export { hubDB as default, sequencerDB, envelopDB }; diff --git a/src/ingestor.ts b/src/ingestor.ts index 5a503251..c1aad9cd 100644 --- a/src/ingestor.ts +++ b/src/ingestor.ts @@ -31,6 +31,10 @@ const NETWORK_METADATA = { } }; +function shouldPinIpfs(type: string, message: any) { + return !(type === 'email-subscription' && message.email); +} + export default async function ingestor(req) { if (flaggedIps.includes(sha256(getIp(req)))) { return Promise.reject('unauthorized'); @@ -92,7 +96,11 @@ export default async function ingestor(req) { } let aliased = false; - if (!['settings', 'alias', 'profile'].includes(type)) { + if ( + !['settings', 'alias', 'profile', 'email-subscription', 'delete-email-subscription'].includes( + type + ) + ) { if (!message.space) return Promise.reject('unknown space'); try { @@ -106,7 +114,16 @@ export default async function ingestor(req) { } // Check if signing address is an alias - const aliasTypes = ['follow', 'unfollow', 'subscribe', 'unsubscribe', 'profile', 'statement']; + const aliasTypes = [ + 'follow', + 'unfollow', + 'subscribe', + 'unsubscribe', + 'profile', + 'statement', + 'email-subscription', + 'delete-email-subscription' + ]; const aliasOptionTypes = ['vote', 'vote-array', 'vote-string', 'proposal', 'delete-proposal']; if (body.address !== message.from) { if (!aliasTypes.includes(type) && !aliasOptionTypes.includes(type)) @@ -206,6 +223,13 @@ export default async function ingestor(req) { type = 'vote'; } + if (type === 'email-subscription') { + payload = { + email: message.email, + subscriptions: message.subscriptions + }; + } + let legacyBody: any = { address: message.from, msg: JSON.stringify({ @@ -245,7 +269,7 @@ export default async function ingestor(req) { ...restBody }; [pinned, receipt] = await Promise.all([ - pin(ipfsBody, process.env.PINEAPPLE_URL), + shouldPinIpfs(type, message) ? pin(ipfsBody, process.env.PINEAPPLE_URL) : { cid: '' }, issueReceipt(formattedSignature) ]); } catch (e) { diff --git a/src/writer/delete-email-subscription.ts b/src/writer/delete-email-subscription.ts new file mode 100644 index 00000000..f509fa66 --- /dev/null +++ b/src/writer/delete-email-subscription.ts @@ -0,0 +1,22 @@ +import { envelopDB } from '../helpers/mysql'; + +type Message = { address: string }; + +export async function verify(message: Message): Promise { + const result = await envelopDB.queryAsync( + 'SELECT * FROM subscribers WHERE address = ? AND verified > 0 LIMIT 1', + [message.address] + ); + + if (!result[0]) { + return Promise.reject('user not subscribed'); + } + + return true; +} + +export async function action(message: Message): Promise { + await envelopDB.queryAsync('DELETE FROM subscribers WHERE address = ? AND verified > 0 LIMIT 1', [ + message.address + ]); +} diff --git a/src/writer/email-subscription.ts b/src/writer/email-subscription.ts new file mode 100644 index 00000000..f6fa13d5 --- /dev/null +++ b/src/writer/email-subscription.ts @@ -0,0 +1,85 @@ +import snapshot from '@snapshot-labs/snapshot.js'; +import log from '../helpers/log'; +import { envelopDB } from '../helpers/mysql'; +import { jsonParse } from '../helpers/utils'; + +type Message = { msg: string; address: string }; +type Payload = { + email?: string; + subscriptions?: string[]; +}; + +function extractPayload(message: Message): Payload { + return jsonParse(message.msg).payload; +} + +export async function verify(message: Message): Promise { + const payload = extractPayload(message); + + const schemaIsValid = snapshot.utils.validateSchema(snapshot.schemas.emailSubscription, payload); + if (schemaIsValid !== true) { + log.warn(`[writer] Wrong email subscription format ${JSON.stringify(schemaIsValid)}`); + return Promise.reject('wrong email subscription format'); + } + + if (payload.email?.length) { + return verifySubscriptionCreation(message, payload); + } else { + return verifySubscriptionUpdate(message, payload); + } +} + +export async function action(message: Message): Promise { + const payload = extractPayload(message); + + if (payload.email?.length) { + await createAction(message, payload); + } else { + await updateAction(message, payload); + } +} + +async function verifySubscriptionCreation(message: Message, payload: Payload): Promise { + const result = await envelopDB.queryAsync( + `SELECT * FROM subscribers WHERE address = ? AND email = ? LIMIT 1`, + [message.address, payload.email] + ); + + if (result[0]) { + return Promise.reject('email already subscribed'); + } + + return true; +} + +async function verifySubscriptionUpdate(message: Message, payload: Payload): Promise { + const result = await envelopDB.queryAsync( + `SELECT * FROM subscribers WHERE address = ? ORDER BY verified DESC LIMIT 1`, + [message.address, payload.email] + ); + + if (!result[0]) { + return Promise.reject('email not subscribed'); + } + + if (!result[0].verified) { + return Promise.reject('email not verified'); + } + + return true; +} + +async function createAction(message: Message, payload: Payload) { + await envelopDB.queryAsync(`INSERT INTO subscribers (email, address, created) VALUES(?, ?, ?)`, [ + payload.email, + message.address, + (Date.now() / 1e3).toFixed() + ]); +} + +async function updateAction(message: Message, payload: Payload) { + await envelopDB.queryAsync( + `UPDATE subscribers SET subscriptions = ? WHERE address = ? AND verified > 0 LIMIT 1`, + [JSON.stringify(payload.subscriptions), message.address] + ); +} diff --git a/src/writer/index.ts b/src/writer/index.ts index 96be0b62..4a4a2e8f 100644 --- a/src/writer/index.ts +++ b/src/writer/index.ts @@ -1,6 +1,8 @@ import * as alias from './alias'; +import * as deleteEmailSubscription from './delete-email-subscription'; import * as deleteProposal from './delete-proposal'; import * as deleteSpace from './delete-space'; +import * as emailSubscription from './email-subscription'; import * as flagProposal from './flag-proposal'; import * as follow from './follow'; import * as profile from './profile'; @@ -25,6 +27,8 @@ export default { unfollow, subscribe, unsubscribe, + 'email-subscription': emailSubscription, + 'delete-email-subscription': deleteEmailSubscription, alias, profile, statement diff --git a/src/writer/statement.ts b/src/writer/statement.ts index 82337d0e..75a0868b 100644 --- a/src/writer/statement.ts +++ b/src/writer/statement.ts @@ -5,6 +5,7 @@ import { DEFAULT_NETWORK_ID, jsonParse, NETWORK_IDS } from '../helpers/utils'; export async function verify(body): Promise { const msg = jsonParse(body.msg, {}); + const schemaIsValid = snapshot.utils.validateSchema(snapshot.schemas.statement, msg.payload); if (schemaIsValid !== true) { log.warn(`[writer] Wrong statement format ${JSON.stringify(schemaIsValid)}`); diff --git a/test/.env.test b/test/.env.test index e8bb7e75..776f296b 100644 --- a/test/.env.test +++ b/test/.env.test @@ -1,5 +1,6 @@ HUB_DATABASE_URL=mysql://root:root@127.0.0.1:3306/snapshot_sequencer_test SEQ_DATABASE_URL=mysql://root:root@127.0.0.1:3306/snapshot_sequencer_test +ENVELOP_DATABASE_URL=mysql://root:root@127.0.0.1:3306/snapshot_sequencer_test NETWORK=mainnet RELAYER_PK=01686849e86499c1860ea0afc97f29c11018cbac049abf843df875c60054076e NODE_ENV=test diff --git a/test/integration/writer/delete-email-subscription.test.ts b/test/integration/writer/delete-email-subscription.test.ts new file mode 100644 index 00000000..91f3bda1 --- /dev/null +++ b/test/integration/writer/delete-email-subscription.test.ts @@ -0,0 +1,71 @@ +import db, { envelopDB, sequencerDB } from '../../../src/helpers/mysql'; +import { action, verify } from '../../../src/writer/delete-email-subscription'; + +describe('writer/delete-subscription', () => { + const TEST_PREFIX = 'test-delete-subscription'; + + afterAll(async () => { + await envelopDB.queryAsync('DELETE FROM subscribers'); + await envelopDB.endAsync(); + await db.endAsync(); + await sequencerDB.endAsync(); + }); + + describe('verify()', () => { + beforeAll(async () => { + await Promise.all([ + envelopDB.queryAsync( + 'INSERT INTO subscribers SET address = ?, email = ?, subscriptions = ?, created = ?, verified = ?', + [`${TEST_PREFIX}-0x0`, 'test@snapshot.org', '[]', 0, 0] + ), + envelopDB.queryAsync( + 'INSERT INTO subscribers SET address = ?, email = ?, subscriptions = ?, created = ?, verified = ?', + [`${TEST_PREFIX}-0x1`, 'test1@snapshot.org', '[]', 0, 1] + ) + ]); + }); + + it('rejects when the address is not subscribed', () => { + return expect(verify({ address: '0x0' })).rejects.toEqual(`user not subscribed`); + }); + + it('rejects when the address is not verified', () => { + return expect(verify({ address: `${TEST_PREFIX}-0x0` })).rejects.toEqual( + `user not subscribed` + ); + }); + + it('resolves when the address is verified', () => { + expect(verify({ address: `${TEST_PREFIX}-0x1` })).resolves; + }); + }); + + describe('action()', () => { + const address = `${TEST_PREFIX}-0x3`; + + beforeAll(async () => { + await Promise.all([ + envelopDB.queryAsync( + 'INSERT INTO subscribers SET address = ?, email = ?, subscriptions = ?, created = ?, verified = ?', + [address, 'test@snapshot.org', '[]', 0, 0] + ), + envelopDB.queryAsync( + 'INSERT INTO subscribers SET address = ?, email = ?, subscriptions = ?, created = ?, verified = ?', + [address, 'test1@snapshot.org', '[]', 0, 1] + ) + ]); + }); + + it('deletes the subscription', async () => { + await action({ address: address }); + + const results = await envelopDB.queryAsync('SELECT * FROM subscribers WHERE address = ?', [ + address + ]); + + // Only delete the verified subscription + expect(results.length).toBe(1); + expect(results[0].email).toEqual('test@snapshot.org'); + }); + }); +}); diff --git a/test/integration/writer/email-subscription.test.ts b/test/integration/writer/email-subscription.test.ts new file mode 100644 index 00000000..66340cdc --- /dev/null +++ b/test/integration/writer/email-subscription.test.ts @@ -0,0 +1,61 @@ +import db, { envelopDB, sequencerDB } from '../../../src/helpers/mysql'; +import { action, verify } from '../../../src/writer/email-subscription'; + +describe('writer/subscription', () => { + const TEST_PREFIX = 'test-subscription'; + const msg = JSON.stringify({ payload: { email: 'test@snapshot.org', subscriptions: [] } }); + + afterAll(async () => { + await envelopDB.queryAsync('DELETE FROM subscribers'); + await envelopDB.endAsync(); + await db.endAsync(); + await sequencerDB.endAsync(); + }); + + describe('verify()', () => { + const address = `${TEST_PREFIX}-0x0`; + const invalidMsg = JSON.stringify({ + payload: { email: 'not an email' } + }); + + beforeAll(async () => { + await envelopDB.queryAsync( + 'INSERT INTO subscribers SET address = ?, email = ?, subscriptions = ?, created = ?, verified = ?', + [address, 'test@snapshot.org', '[]', 0, 0] + ); + }); + + it('rejects when the address is already subscribed', () => { + return expect(verify({ address: address, msg })).rejects.toEqual('email already subscribed'); + }); + + it('rejects when the subscription type is not valid', () => { + return expect(verify({ address: address, msg: invalidMsg })).rejects.toEqual( + 'wrong email subscription format' + ); + }); + + it('resolves when all args are valid', () => { + return expect(verify({ address: `${TEST_PREFIX}-0x1`, msg })).resolves.toBe(true); + }); + }); + + describe('action()', () => { + const address = `${TEST_PREFIX}-0x1`; + + it('creates a subscription', async () => { + await action({ + address: address, + msg + }); + + const result = await envelopDB.queryAsync( + `SELECT * FROM subscribers WHERE address = ? LIMIT 1`, + [address] + ); + + expect(result[0].email).toEqual('test@snapshot.org'); + expect(result[0].verified).toEqual(0); + }); + }); +}); diff --git a/test/integration/writer/update-email-subscription.test.ts b/test/integration/writer/update-email-subscription.test.ts new file mode 100644 index 00000000..26f1da6c --- /dev/null +++ b/test/integration/writer/update-email-subscription.test.ts @@ -0,0 +1,79 @@ +import db, { envelopDB, sequencerDB } from '../../../src/helpers/mysql'; +import { action, verify } from '../../../src/writer/email-subscription'; + +describe('writer/update-subscription', () => { + const TEST_PREFIX = 'test-update-subscription'; + + afterAll(async () => { + await envelopDB.queryAsync('DELETE FROM subscribers'); + await envelopDB.endAsync(); + await db.endAsync(); + await sequencerDB.endAsync(); + }); + + describe('verify()', () => { + const msg = JSON.stringify({ payload: { subscriptions: ['closedProposal'] } }); + const msgWithInvalidSubscriptions = JSON.stringify({ + payload: { subscriptions: ['test'] } + }); + + beforeAll(async () => { + await Promise.all([ + envelopDB.queryAsync( + 'INSERT INTO subscribers SET address = ?, email = ?, subscriptions = ?, created = ?, verified = ?', + [`${TEST_PREFIX}-0x0`, 'test@snapshot.org', '[]', 0, 0] + ), + envelopDB.queryAsync( + 'INSERT INTO subscribers SET address = ?, email = ?, subscriptions = ?, created = ?, verified = ?', + [`${TEST_PREFIX}-0x1`, 'test1@snapshot.org', '[]', 0, 1] + ) + ]); + }); + + it('rejects when the address is not subscribed', () => { + return expect(verify({ address: '0x0', msg })).rejects.toEqual(`email not subscribed`); + }); + + it('rejects when the address is not verified', () => { + return expect(verify({ address: `${TEST_PREFIX}-0x0`, msg })).rejects.toEqual( + `email not verified` + ); + }); + + it('rejects when subscription values are not valid', () => { + return expect( + verify({ address: `${TEST_PREFIX}-0x1`, msg: msgWithInvalidSubscriptions }) + ).rejects.toEqual(`wrong email subscription format`); + }); + + it('resolves when all args are valid', () => { + expect(verify({ address: `${TEST_PREFIX}-0x1`, msg })).resolves; + }); + }); + + describe('action()', () => { + const address = `${TEST_PREFIX}-0x3`; + const subscriptions = ['newProposal', 'closedProposal']; + + beforeAll(async () => { + await envelopDB.queryAsync( + 'INSERT INTO subscribers SET address = ?, email = ?, subscriptions = ?, created = ?, verified = ?', + [address, 'test@snapshot.org', '["summary"]', 0, 1] + ); + }); + + it('updates the subscription', async () => { + await action({ + address, + msg: JSON.stringify({ payload: { subscriptions } }) + }); + + const result = await envelopDB.queryAsync( + 'SELECT subscriptions FROM subscribers WHERE address = ? AND verified > 0 LIMIT 1', + [address] + ); + + expect(JSON.parse(result[0].subscriptions)).toEqual(subscriptions); + }); + }); +}); diff --git a/test/schema_envelop.sql b/test/schema_envelop.sql new file mode 100644 index 00000000..8e67b7ab --- /dev/null +++ b/test/schema_envelop.sql @@ -0,0 +1,11 @@ +CREATE TABLE subscribers ( + email VARCHAR(256) NOT NULL, + address VARCHAR(256) NOT NULL, + subscriptions JSON DEFAULT NULL, + created BIGINT NOT NULL, + verified BIGINT NOT NULL DEFAULT 0, + PRIMARY KEY (email, address), + UNIQUE KEY idx_address_email (address, email), + INDEX created (created), + INDEX verified (verified) +); diff --git a/test/setupDb.ts b/test/setupDb.ts index b2aff5e1..bbd37184 100755 --- a/test/setupDb.ts +++ b/test/setupDb.ts @@ -1,9 +1,9 @@ -import mysql from 'mysql'; -import Pool from 'mysql/lib/Pool'; -import Connection from 'mysql/lib/Connection'; +import fs from 'fs'; import bluebird from 'bluebird'; import parse from 'connection-string'; -import fs from 'fs'; +import mysql from 'mysql'; +import Connection from 'mysql/lib/Connection'; +import Pool from 'mysql/lib/Pool'; // @ts-ignore const config = parse(process.env.HUB_DATABASE_URL); @@ -19,6 +19,8 @@ if (!dbName.endsWith('_test')) { process.exit(1); } +const schemaFiles = ['./test/schema.sql', './test/schema_envelop.sql']; + async function run() { const splitToken = ');'; @@ -30,8 +32,9 @@ async function run() { console.info(`- Creating new database: ${dbName}`); await db.queryAsync(`CREATE DATABASE ${dbName}`); - const schema = fs - .readFileSync('./test/schema.sql', 'utf8') + const schema = schemaFiles + .map(file => fs.readFileSync(file, 'utf8')) + .join(' ') .replaceAll('CREATE TABLE ', `CREATE TABLE ${dbName}.`) .split(splitToken) .filter(s => s.trim().length > 0);