Skip to content

Commit

Permalink
Merge branch 'ab385'
Browse files Browse the repository at this point in the history
  • Loading branch information
alastair committed Jul 2, 2019
2 parents 08f4dc5 + ec484b5 commit 5cd0e2d
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 122 deletions.
1 change: 0 additions & 1 deletion admin/sql/create_indexes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@ CREATE INDEX highlevel_ndx_highlevel_model ON highlevel_model (highlevel);
CREATE UNIQUE INDEX lower_musicbrainz_id_ndx_user ON "user" (lower(musicbrainz_id));

CREATE INDEX collected_ndx_statistics ON statistics (collected);
CREATE INDEX collected_hour_ndx_statistics ON statistics (date_part('hour'::text, timezone('UTC'::text, collected)));

COMMIT;
2 changes: 1 addition & 1 deletion admin/sql/create_primary_keys.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ALTER TABLE highlevel_meta ADD CONSTRAINT highlevel_meta_pkey PRIMARY KEY (id);
ALTER TABLE highlevel_model ADD CONSTRAINT highlevel_model_pkey PRIMARY KEY (id);
ALTER TABLE model ADD CONSTRAINT model_pkey PRIMARY KEY (id);
ALTER TABLE version ADD CONSTRAINT version_pkey PRIMARY KEY (id);
ALTER TABLE statistics ADD CONSTRAINT statistics_pkey PRIMARY KEY (name, collected);
ALTER TABLE statistics ADD CONSTRAINT statistics_pkey PRIMARY KEY (collected);
ALTER TABLE incremental_dumps ADD CONSTRAINT incremental_dumps_pkey PRIMARY KEY (id);
ALTER TABLE "user" ADD CONSTRAINT user_pkey PRIMARY KEY (id);
ALTER TABLE dataset ADD CONSTRAINT dataset_pkey PRIMARY KEY (id);
Expand Down
3 changes: 1 addition & 2 deletions admin/sql/create_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ CREATE TABLE model (

CREATE TABLE statistics (
collected TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
name TEXT NOT NULL,
value INTEGER NOT NULL
stats JSONB NOT NULL
);

CREATE TABLE incremental_dumps (
Expand Down
33 changes: 33 additions & 0 deletions admin/updates/20190611-statistics.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
BEGIN;

-- Add a column to store json values of statistics
ALTER TABLE statistics ADD COLUMN stats JSONB;

-- For each day at 00h00m00s, make a json object representing the stats for that
-- timestamp and add it to the column (This duplicates the same data for all 6 types, but we'll delete 5 of them)
UPDATE statistics SET stats = subquery.stats
FROM (SELECT collected
, jsonb_object_agg(name, value) as stats
FROM statistics
WHERE date_part('hour', timezone('UTC'::text, collected)) = 0
GROUP BY collected
ORDER BY collected DESC) as subquery
WHERE statistics.collected = subquery.collected;

-- Once we have combined stats in each row, delete the rows without stats (old hourly stats)
DELETE FROM statistics WHERE stats is NULL;
-- and keep only one stat per timestamp
DELETE FROM statistics WHERE name <> 'lowlevel-total';
-- And we no longer need name/value columns because we have the stats jsonb column
ALTER TABLE statistics DROP COLUMN name;
ALTER TABLE statistics DROP COLUMN value;

-- We no longer need a specific index for getting daily stats because stats are now only daily
DROP INDEX IF EXISTS collected_hour_ndx_statistics;

-- now that the stats column is populated, set it not null
ALTER TABLE statistics ALTER COLUMN stats SET NOT NULL;

ALTER TABLE statistics ADD CONSTRAINT statistics_pkey PRIMARY KEY (collected);

COMMIT;
3 changes: 1 addition & 2 deletions db/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@
"created",
),
"statistics": (
"name",
"value",
"stats",
"collected",
),
"incremental_dumps": (
Expand Down
107 changes: 47 additions & 60 deletions db/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
pair.
"""
from brainzutils import cache
from sqlalchemy.dialects.postgresql import JSONB

import db
import db.exceptions
import datetime
import pytz
import calendar
import six
import json

from sqlalchemy import text
from sqlalchemy import text, bindparam

STATS_CACHE_TIMEOUT = 60 * 10 # 10 minutes
STATS_CACHE_TIMEOUT = 60 * 60 # 1 hour
LAST_MBIDS_CACHE_TIMEOUT = 60 # 1 minute (this query is cheap)

STATS_CACHE_KEY = "recent-stats"
Expand Down Expand Up @@ -74,12 +77,12 @@ def get_last_submitted_recordings():


def compute_stats(to_date):
"""Compute hourly stats to a given date and write them to
"""Compute daily stats to a given date and write them to
the database.
Take the date of most recent statistics, or if no statistics
are added, the earliest date of a submission, and compute and write
for every hour from that date to `to_date` the number of items
for every day from that date to `to_date` the number of items
in the database.
Args:
Expand All @@ -96,22 +99,28 @@ def compute_stats(to_date):
# If there are no lowlevel submissions, we stop
return

next_date = _get_next_hour(stats_date)
next_date = _get_next_day(stats_date)

while next_date < to_date:
stats = _count_submissions_to_date(connection, next_date)
_write_stats(connection, next_date, stats)
next_date = _get_next_hour(next_date)
next_date = _get_next_day(next_date)


def _write_stats(connection, date, stats):
"""Records a value with a given name and current timestamp."""
with connection.begin():
for name, value in six.iteritems(stats):
q = text("""
INSERT INTO statistics (collected, name, value)
VALUES (:collected, :name, :value)""")
connection.execute(q, {"collected": date, "name": name, "value": value})

if len(stats) != len(stats_key_map):
raise ValueError("provided stats map is of unexpected size")
for k, v in stats.items():
try:
int(v)
except ValueError:
raise ValueError("value %s in map isn't an integer" % v)
query = text("""
INSERT INTO statistics (collected, stats)
VALUES (:collected, :stats)""").bindparams(bindparam('stats', type_=JSONB))
connection.execute(query, {"collected": date, "stats": stats})


def add_stats_to_cache():
Expand All @@ -125,21 +134,6 @@ def add_stats_to_cache():
time=STATS_CACHE_TIMEOUT, namespace=STATS_CACHE_NAMESPACE)


def has_incomplete_stats_rows():
"""Check if there are any statistics rows that are missing a key.
Returns: True if any stats hour has less than 6 stats keys, otherwise False"""

with db.engine.connect() as connection:
query = text("""
SELECT collected
, count(name)
FROM statistics
GROUP BY collected
HAVING count(name) < :numitems""")
result = connection.execute(query, {"numitems": len(stats_key_map)})
return result.rowcount > 0


def get_stats_summary():
"""Load a summary of statistics to show on the homepage.
Expand All @@ -161,8 +155,12 @@ def _get_stats_from_cache():
"""Get submission statistics from cache"""
stats = cache.get(STATS_CACHE_KEY, namespace=STATS_CACHE_NAMESPACE)
last_collected = cache.get(STATS_CACHE_LAST_UPDATE_KEY,
namespace=STATS_CACHE_NAMESPACE)
namespace=STATS_CACHE_NAMESPACE)

# TODO: See BU-28, a datetime from the cache loses its timezone. In this case we
# know that it's at utc, so force it
if last_collected:
last_collected = pytz.utc.localize(last_collected)
return last_collected, stats


Expand Down Expand Up @@ -191,44 +189,32 @@ def format_statistics_for_highcharts(data):


def load_statistics_data(limit=None):
# Postgres doesn't let you create a json dictionary using values
# from one column as keys and another column as values. Instead we
# create an array of {"name": name, "value": value} objects and change
# it in python
args = {}
qtext = """
SELECT collected
, json_agg(row_to_json(
(SELECT r FROM (SELECT name, value) r) )) AS stats
FROM statistics
WHERE date_part('hour', timezone('UTC'::text, collected)) = 0
OR collected = (SELECT collected
FROM statistics
ORDER BY collected DESC
LIMIT 1)
GROUP BY collected
ORDER BY collected DESC
qtext = """SELECT collected, stats
FROM statistics
ORDER BY collected DESC
"""
if limit:
args["limit"] = int(limit)
qtext += " LIMIT :limit"
query = text(qtext)
with db.engine.connect() as connection:
stats_result = connection.execute(query, args)
ret = []
for line in stats_result:
row = {"collected": line["collected"], "stats": {}}
for stat in line["stats"]:
row["stats"][stat["name"]] = stat["value"]
ret.append(row)

# We order by DESC in order to use the `limit` parameter, but
# we actually need the stats in increasing order.
return list(reversed(ret))
# We order by DESC in order to use the `limit` parameter, but
# we actually need the stats in increasing order.
return list(reversed([dict(r) for r in stats_result]))


def get_statistics_history():
return format_statistics_for_highcharts(load_statistics_data())
stats = load_statistics_data()
cached_stats_date, cached_stats = _get_stats_from_cache()
# If cached stats exist and it's newer than the most recent database stats,
# add it to the end. Don't add cached stats if there are no database stats
if cached_stats_date and stats:
last_stats_collected = stats[-1]["collected"]
if cached_stats_date > last_stats_collected:
stats.append({"collected": cached_stats_date, "stats": cached_stats})
return format_statistics_for_highcharts(stats)


def _count_submissions_to_date(connection, to_date):
Expand Down Expand Up @@ -318,12 +304,13 @@ def _get_most_recent_stats_date(connection):
return row[0]


def _get_next_hour(date):
"""Round up a date to the nearest hour:00:00.
def _get_next_day(date):
"""Round up a date to the nearest day:00:00:00
Arguments:
date: a datetime
date: a datetime
"""
delta = datetime.timedelta(hours=1)
delta = datetime.timedelta(days=1)
date = date + delta
date = date.replace(minute=0, second=0, microsecond=0)
date = date.replace(hour=0, minute=0, second=0, microsecond=0)
return date
Loading

0 comments on commit 5cd0e2d

Please sign in to comment.