Skip to content

Commit

Permalink
Add a JSONB column to the database to make jsonpath queries
Browse files Browse the repository at this point in the history
Signed-off-by: Aurélien Bompard <[email protected]>
  • Loading branch information
abompard committed May 16, 2024
1 parent 99fb739 commit 0589b6f
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 60 deletions.
22 changes: 2 additions & 20 deletions datanommer.commands/datanommer/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,18 @@
from datetime import datetime, timedelta, timezone

import click
from fedora_messaging import config as fedora_messaging_config
from sqlalchemy import func, select

import datanommer.models as m

from .utils import config_option, get_config


__version__ = importlib.metadata.version("datanommer.commands")

log = logging.getLogger("datanommer")


def get_config(config_path=None):
if config_path:
fedora_messaging_config.conf.load_config(config_path)
conf = fedora_messaging_config.conf["consumer_config"]
for key in ("datanommer_sqlalchemy_url", "alembic_ini"):
if key not in conf:
raise click.ClickException(f"{key} not defined in the fedora-messaging config")
return conf


config_option = click.option(
"-c",
"--config",
"config_path",
help="Load this Fedora Messaging config file",
type=click.Path(exists=True, readable=True),
)


@click.command()
@config_option
def create(config_path):
Expand Down
43 changes: 13 additions & 30 deletions datanommer.commands/datanommer/commands/extract_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import click
from fedora_messaging.exceptions import ValidationError
from fedora_messaging.message import load_message as load_message
from sqlalchemy import and_, func, not_, select
from sqlalchemy import and_, not_, select

import datanommer.models as m

from . import config_option, get_config
from .utils import CHUNK_SIZE, config_option, get_config, iterate_over_messages


# Go trough messages these at a time
CHUNK_SIZE = 10000
log = logging.getLogger(__name__)

SKIP_TOPICS = [
Expand Down Expand Up @@ -111,32 +109,17 @@ def main(config_path, topic, category, start, end, force_schema, chunk_size, deb
isouter=True,
).where(m.users_assoc_table.c.msg_id.is_(None))

total = m.session.scalar(query.with_only_columns(func.count(m.Message.id)))
if not total:
click.echo("No messages matched.")
return

click.echo(f"Considering {total} message{'s' if total > 1 else ''}")

query = query.order_by(m.Message.timestamp)
with click.progressbar(length=total) as bar:
for chunk in range(int(total / chunk_size) + 1):
offset = chunk * chunk_size
chunk_query = query.limit(chunk_size).offset(offset)
for message in m.session.scalars(chunk_query):
bar.update(1)
usernames = get_usernames(message, force_schema=force_schema)
if not usernames:
m.session.expunge(message)
continue
message._insert_list(m.User, m.users_assoc_table, usernames)
if debug:
click.echo(
f"Usernames for message {message.msg_id} of topic {message.topic}"
f": {', '.join(usernames)}"
)
m.session.commit()
m.session.expunge_all()
for message in iterate_over_messages(query, chunk_size):
usernames = get_usernames(message, force_schema=force_schema)
if not usernames:
m.session.expunge(message)
continue
message._insert_list(m.User, m.users_assoc_table, usernames)
if debug:
click.echo(
f"Usernames for message {message.msg_id} of topic {message.topic}"
f": {', '.join(usernames)}"
)


def get_usernames(db_message, force_schema):
Expand Down
96 changes: 96 additions & 0 deletions datanommer.commands/datanommer/commands/populate_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import datetime
import logging

import click
from fedora_messaging.message import load_message as load_message
from sqlalchemy import cast, select, update
from sqlalchemy.dialects import postgresql

import datanommer.models as m

from .utils import config_option, get_config


log = logging.getLogger(__name__)


@click.command()
@config_option
@click.option(
"--chunk-size",
default=30,
type=int,
show_default=True,
help="Go through messages these many days at a time (lower is slower but saves memory).",
)
@click.option(
"--debug",
is_flag=True,
help="Show more information.",
)
def main(config_path, chunk_size, debug):
"""Go over old messages and populate the msg_json field."""
config = get_config(config_path)
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO, format="%(message)s")
m.init(
config["datanommer_sqlalchemy_url"],
alembic_ini=config["alembic_ini"],
)

query = select(m.Message).where(m.Message.msg_json.is_(None))
first_message = m.session.scalars(query.order_by(m.Message.timestamp).limit(1)).first()
if first_message is None:
click.echo("No message to populate.")
return

for start_date, end_date in iterate_over_time(
first_message.timestamp, datetime.timedelta(days=chunk_size)
):
log.debug(
"Converting messages between %s and %s",
start_date.date().isoformat(),
end_date.date().isoformat(),
)
# Fill the msg_json column from the contents of the msg_raw column
query = (
update(m.Message)
.where(
m.Message.msg_json.is_(None),
m.Message.timestamp >= start_date,
m.Message.timestamp < end_date,
)
.values(msg_json=cast(m.Message.msg_raw, postgresql.JSONB(none_as_null=True)))
)
result = m.session.execute(query)
m.session.commit()
log.debug("Populated %s rows", result.rowcount)
# Empty the msg_raw column if msg_json is not filled
query = (
update(m.Message)
.where(
m.Message.msg_json.is_not(None),
m.Message.timestamp >= start_date,
m.Message.timestamp < end_date,
)
.values(msg_raw=None)
)
result = m.session.execute(query)
log.debug("Purged %s rows", result.rowcount)


def iterate_over_time(start_at, interval):
intervals = []
start_date = start_at
now = datetime.datetime.now()
while start_date < now:
end_date = start_date + interval
intervals.append((start_date, end_date))
start_date = end_date

total = len(intervals)
with click.progressbar(length=total) as bar:
for start_date, end_date in intervals:
yield start_date, end_date
m.session.commit()
m.session.expunge_all()
bar.update(1)
52 changes: 52 additions & 0 deletions datanommer.commands/datanommer/commands/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging

import click
from fedora_messaging import config as fedora_messaging_config
from fedora_messaging.message import load_message as load_message
from sqlalchemy import func

import datanommer.models as m


# Go trough messages these many at a time
CHUNK_SIZE = 10000
log = logging.getLogger(__name__)


def get_config(config_path=None):
if config_path:
fedora_messaging_config.conf.load_config(config_path)
conf = fedora_messaging_config.conf["consumer_config"]
for key in ("datanommer_sqlalchemy_url", "alembic_ini"):
if key not in conf:
raise click.ClickException(f"{key} not defined in the fedora-messaging config")
return conf


config_option = click.option(
"-c",
"--config",
"config_path",
help="Load this Fedora Messaging config file",
type=click.Path(exists=True, readable=True),
)


def iterate_over_messages(query, chunk_size):
total = m.session.scalar(query.with_only_columns(func.count(m.Message.id)))
if not total:
click.echo("No messages matched.")
return

click.echo(f"Considering {total} message{'s' if total > 1 else ''}")

query = query.order_by(m.Message.timestamp)
with click.progressbar(length=total) as bar:
for chunk in range(int(total / chunk_size) + 1):
offset = chunk * chunk_size
chunk_query = query.limit(chunk_size).offset(offset)
for message in m.session.scalars(chunk_query):
bar.update(1)
yield message
m.session.commit()
m.session.expunge_all()
1 change: 1 addition & 0 deletions datanommer.commands/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ datanommer-dump = "datanommer.commands:dump"
datanommer-stats = "datanommer.commands:stats"
datanommer-latest = "datanommer.commands:latest"
datanommer-extract-users = "datanommer.commands.extract_users:main"
datanommer-populate-json = "datanommer.commands.populate_json:main"


[build-system]
Expand Down
2 changes: 1 addition & 1 deletion datanommer.commands/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def mock_init(mocker):
@pytest.fixture
def mock_config(mocker):
mocker.patch.dict(
datanommer.commands.fedora_messaging_config.conf["consumer_config"],
datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"],
{
"datanommer_sqlalchemy_url": "",
"alembic_ini": None,
Expand Down
10 changes: 6 additions & 4 deletions datanommer.commands/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

def test_get_datanommer_sqlalchemy_url_keyerror(mocker):
mocker.patch.dict(
datanommer.commands.fedora_messaging_config.conf["consumer_config"],
datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"],
{},
clear=True,
)
Expand All @@ -42,9 +42,11 @@ def test_get_datanommer_sqlalchemy_url_config(mocker):
"datanommer_sqlalchemy_url": "",
"alembic_ini": "/some/where",
}
mocker.patch.dict(datanommer.commands.fedora_messaging_config.conf["consumer_config"], conf)
mocker.patch.dict(
datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], conf
)
load_config = mocker.patch(
"datanommer.commands.fedora_messaging_config.conf.load_config",
"datanommer.commands.utils.fedora_messaging_config.conf.load_config",
)
datanommer.commands.get_config("some-path")
load_config.assert_called_with("some-path")
Expand All @@ -53,7 +55,7 @@ def test_get_datanommer_sqlalchemy_url_config(mocker):
def test_create(mocker):
mock_model_init = mocker.patch("datanommer.commands.m.init")
mocker.patch.dict(
datanommer.commands.fedora_messaging_config.conf["consumer_config"],
datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"],
{
"datanommer_sqlalchemy_url": "TESTURL",
"alembic_ini": "/some/where",
Expand Down
3 changes: 2 additions & 1 deletion datanommer.commands/tests/test_extract_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ def test_extract_force_schema(bodhi_message_db, mock_config, mock_init):


def test_extract_invalid_message(bodhi_message_db, mock_config, mock_init):
bodhi_message_db.msg = "this is invalid"
bodhi_message_db.msg_raw = "this is invalid"
bodhi_message_db.msg_json = None
m.session.commit()

runner = CliRunner()
Expand Down
52 changes: 52 additions & 0 deletions datanommer.commands/tests/test_populate_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from unittest.mock import Mock

import pytest
from click.testing import CliRunner

import datanommer.models as m
from datanommer.commands.populate_json import main as populate_json

from .utils import generate_bodhi_update_complete_message


@pytest.fixture
def bodhi_message_db(datanommer_models):
msg = generate_bodhi_update_complete_message()
m.add(msg)
msg_in_db = m.Message.from_msg_id(msg.id)
msg_in_db.msg_raw = msg_in_db.msg_json
msg_in_db.msg_json = None
m.session.commit()
m.session.refresh(msg_in_db)
assert msg_in_db.msg_json is None
return msg_in_db


@pytest.fixture(autouse=True)
def no_expunge(datanommer_models, monkeypatch):
monkeypatch.setattr(m.session, "expunge_all", Mock(name="expunge_all"))
monkeypatch.setattr(m.session, "expunge", Mock(name="expunge"))


def test_populate_json(bodhi_message_db, mock_config, mock_init):
runner = CliRunner()
result = runner.invoke(populate_json)

assert result.exit_code == 0, result.output

m.session.refresh(bodhi_message_db)
print(bodhi_message_db.msg_json)
assert bodhi_message_db.msg_json is not None
assert bodhi_message_db.msg_raw is None
total, _pages, _messages = m.Message.grep(jsons=['$.comment.user.name == "dudemcpants"'])
assert total == 1
assert _messages == [bodhi_message_db]


def test_populate_json_no_message(monkeypatch, mock_config, mock_init):
monkeypatch.setattr(m.session, "execute", Mock(name="execute"))
runner = CliRunner()
result = runner.invoke(populate_json)
assert result.exit_code == 0, result.output
assert result.output == "No message to populate.\n"
m.session.execute.assert_not_called()
Loading

0 comments on commit 0589b6f

Please sign in to comment.