Skip to content

Commit

Permalink
conformance-profiles: Database table and worker to process data
Browse files Browse the repository at this point in the history
  • Loading branch information
jarofgreen committed Jul 22, 2020
1 parent b031ceb commit 24190ea
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 6 deletions.
24 changes: 23 additions & 1 deletion docs/stage/profile-normalised-data.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
# Profile Normalised Data

Not Written Yet
This will profile all normalised data against the data profiles and store the results in the database.

To run this:

`$ node ./src/bin/profile-normalised-data.js`

It can be stopped at any time and it will not leave the database in a bad state or lose to much work.

When restarted it will pick up where it left off.

## Database Storage & Errors

It will store the results of this in the `normalised_data_profile_results` table.

Rows are stored per item of normalised data and per profile,
so you should expect this table to have 3 times as many rows as `normalised_data` (if there are 3 profiles).

For any data profile and normalised data item, there are 4 states:

* no row - we haven't tried to run the check yet
* a row with `checked=FALSE` - we tried to run the check but it went wrong. See `error_checking_message`.
* a row with `checked=TRUE` and nothing in `results` - we checked it and the data passed the check!
* a row with `checked=TRUE` and things in `results` - we checked it and the data failed the check. See `results`.
4 changes: 2 additions & 2 deletions src/bin/heroku.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import spider from '../lib/spider.js';
import download_raw_all_publisher_feeds from '../lib/download-raw.js';
import normalise_data_all_publisher_feeds from '../lib/normalise-data.js';
import validate_raw_data_all from '../lib/validate-raw-data.js';
import profile_normalised_data_all from '../lib/profile-normalised-data.js';
import Settings from '../lib/settings.js';
import tls from 'tls';
import Utils from '../lib/utils.js';
Expand All @@ -19,8 +20,7 @@ spider(Settings.spiderDataCatalogStartURL);
download_raw_all_publisher_feeds();
validate_raw_data_all();
normalise_data_all_publisher_feeds();
// TODO need to add data profile work here, when it exists

profile_normalised_data_all();

// When a Heroku worker ends, Heroku starts a new one. https://devcenter.heroku.com/articles/dynos#restarting
// When there is no work to be done, we don't want the worker to be constantly checking as the worker starts, ends, starts, ends, etc in a loop
Expand Down
5 changes: 5 additions & 0 deletions src/bin/profile-normalised-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env node
import profile_normalised_data_all from '../lib/profile-normalised-data.js';


profile_normalised_data_all();
9 changes: 9 additions & 0 deletions src/lib/database-migrations/011-data-profiles.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE normalised_data_profile_results (
normalised_data_id BIGINT NOT NULL,
profile_name TEXT NOT NULL,
checked BOOLEAN NOT NULL,
error_checking_message TEXT NULL,
results JSONB NULL,
PRIMARY KEY(normalised_data_id, profile_name),
CONSTRAINT normalised_data_profile_results_normalised_data_id FOREIGN KEY (normalised_data_id) REFERENCES normalised_data(id)
);
1 change: 1 addition & 0 deletions src/lib/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async function delete_database() {
client = await database_pool.connect();
await client.query('DROP TABLE IF EXISTS download_raw_errors CASCADE');
await client.query('DROP TABLE IF EXISTS spider_data_catalog_error CASCADE');
await client.query('DROP TABLE IF EXISTS normalised_data_profile_results CASCADE');
await client.query('DROP TABLE IF EXISTS normalised_data CASCADE');
await client.query('DROP TABLE IF EXISTS raw_data CASCADE');
await client.query('DROP TABLE IF EXISTS publisher_feed CASCADE');
Expand Down
9 changes: 7 additions & 2 deletions src/lib/normalise-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,19 @@ async function store_normalised_callback(raw_data_id, normalised_events) {
normalised_event.parentId
];

await client.query(
const res = await client.query(
'INSERT INTO normalised_data (raw_data_id, data_id, data_deleted, data, data_kind, raw_data_parent_id) ' +
'VALUES ($1, $2, \'f\', $3, $4, $5) ' +
'ON CONFLICT (data_id) DO UPDATE SET ' +
'raw_data_id=$1, data_id=$2, data=$3, data_kind=$4, raw_data_parent_id=$5, updated_at=(now() at time zone \'utc\'), data_deleted=\'f\'' ,
'raw_data_id=$1, data_id=$2, data=$3, data_kind=$4, raw_data_parent_id=$5, updated_at=(now() at time zone \'utc\'), data_deleted=\'f\'' +
'RETURNING id',
query_data
);

// Because we have updated the data, the results of the profile checks are now stale. Delete them so we recalculate.
// (We only need to do this on UPDATE not INSERT but we can't tell the difference).
await client.query('DELETE FROM normalised_data_profile_results WHERE normalised_data_id=$1', [res.rows[0].id])

}

await client.query(
Expand Down
87 changes: 87 additions & 0 deletions src/lib/profile-normalised-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { database_pool } from './database.js';
import Settings from './settings.js';
import apply_data_profile from './util-data-profile.js';



async function profile_normalised_data_all() {
for(var profile_name of Settings.dataProfiles) {
// not await - run each profile at once
profile_normalised_data_all_for_profile(profile_name);
};
}

async function profile_normalised_data_all_for_profile(profile_name) {

const select_sql = 'SELECT normalised_data.* FROM normalised_data '+
'LEFT JOIN normalised_data_profile_results '+
'ON normalised_data_profile_results.normalised_data_id = normalised_data.id AND normalised_data_profile_results.profile_name=$1 '+
'WHERE normalised_data_profile_results.normalised_data_id IS NULL AND normalised_data.data_deleted=FALSE '+
'ORDER BY normalised_data.updated_at ASC LIMIT 10';

while(true) {

let rows = []

// Step 1 - load data to process
// we open and make sure we CLOSE the database connection after this, so the DB connection is not held open when processing in an unneeded manner
const client = await database_pool.connect();
try {
const res_find_raw_data = await client.query(select_sql, [profile_name]);
if (res_find_raw_data.rows.length == 0) {
break;
}
// Make sure we just store raw data and no database cursors, etc
for (var raw_data of res_find_raw_data.rows) {
rows.push(raw_data)
}
} catch(error) {
console.error("ERROR validate_raw_data_all");
console.error(error);
} finally {
client.release()
}

// Step 2 - process each item of data we got
for (var raw_data of rows) {
await profile_normalised_data_for_item_for_profile(raw_data, profile_name);
}

}

}

async function profile_normalised_data_for_item_for_profile(normalised_data, profile_name) {

//console.log("Profiling Normalised Data id "+ normalised_data.id + " for Profile " + profile_name);

const results = await apply_data_profile(normalised_data.data, profile_name);

const client = await database_pool.connect();
try {
if (results.done) {
await client.query(
"INSERT INTO normalised_data_profile_results (normalised_data_id, profile_name, checked, results) VALUES ($1, $2, TRUE, $3)",
[normalised_data.id, profile_name, JSON.stringify(results.results)]
);
} else {
await client.query(
"INSERT INTO normalised_data_profile_results (normalised_data_id, profile_name, checked, error_checking_message) VALUES ($1, $2, FALSE, $3)",
[normalised_data.id, profile_name, results.error]
);
}
} catch(error) {
console.error("ERROR profile_normalised_data_for_item_for_profile");
console.error(error);
} finally {
client.release()
}

}


export {
profile_normalised_data_all,
};

export default profile_normalised_data_all;
4 changes: 3 additions & 1 deletion src/lib/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ const Settings = {
// Some publishers may only support older versions
"tlsDefaultMinimumVersion": "TLSv1",

"herokuWorkerMinimumCycleHours": 6
"herokuWorkerMinimumCycleHours": 6,

"dataProfiles": ["core", "accessibility", "socialrx"]

}

Expand Down

0 comments on commit 24190ea

Please sign in to comment.