Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a JSONB column to the database to make jsonpath queries #1312

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions datanommer.commands/datanommer/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,18 @@
from datetime import datetime, timedelta, timezone

import click
from fedora_messaging import config as fedora_messaging_config
from sqlalchemy import func, select

import datanommer.models as m

from .utils import config_option, get_config


__version__ = importlib.metadata.version("datanommer.commands")

log = logging.getLogger("datanommer")


def get_config(config_path=None):
if config_path:
fedora_messaging_config.conf.load_config(config_path)
conf = fedora_messaging_config.conf["consumer_config"]
for key in ("datanommer_sqlalchemy_url", "alembic_ini"):
if key not in conf:
raise click.ClickException(f"{key} not defined in the fedora-messaging config")
return conf


config_option = click.option(
"-c",
"--config",
"config_path",
help="Load this Fedora Messaging config file",
type=click.Path(exists=True, readable=True),
)


@click.command()
@config_option
def create(config_path):
Expand Down
43 changes: 13 additions & 30 deletions datanommer.commands/datanommer/commands/extract_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import click
from fedora_messaging.exceptions import ValidationError
from fedora_messaging.message import load_message as load_message
from sqlalchemy import and_, func, not_, select
from sqlalchemy import and_, not_, select

import datanommer.models as m

from . import config_option, get_config
from .utils import CHUNK_SIZE, config_option, get_config, iterate_over_messages


# Go trough messages these at a time
CHUNK_SIZE = 10000
log = logging.getLogger(__name__)

SKIP_TOPICS = [
Expand Down Expand Up @@ -111,32 +109,17 @@ def main(config_path, topic, category, start, end, force_schema, chunk_size, deb
isouter=True,
).where(m.users_assoc_table.c.msg_id.is_(None))

total = m.session.scalar(query.with_only_columns(func.count(m.Message.id)))
if not total:
click.echo("No messages matched.")
return

click.echo(f"Considering {total} message{'s' if total > 1 else ''}")

query = query.order_by(m.Message.timestamp)
with click.progressbar(length=total) as bar:
for chunk in range(int(total / chunk_size) + 1):
offset = chunk * chunk_size
chunk_query = query.limit(chunk_size).offset(offset)
for message in m.session.scalars(chunk_query):
bar.update(1)
usernames = get_usernames(message, force_schema=force_schema)
if not usernames:
m.session.expunge(message)
continue
message._insert_list(m.User, m.users_assoc_table, usernames)
if debug:
click.echo(
f"Usernames for message {message.msg_id} of topic {message.topic}"
f": {', '.join(usernames)}"
)
m.session.commit()
m.session.expunge_all()
for message in iterate_over_messages(query, chunk_size):
usernames = get_usernames(message, force_schema=force_schema)
if not usernames:
m.session.expunge(message)
continue
message._insert_list(m.User, m.users_assoc_table, usernames)
if debug:
click.echo(
f"Usernames for message {message.msg_id} of topic {message.topic}"
f": {', '.join(usernames)}"
)


def get_usernames(db_message, force_schema):
Expand Down
96 changes: 96 additions & 0 deletions datanommer.commands/datanommer/commands/populate_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import datetime
import logging

import click
from fedora_messaging.message import load_message as load_message
from sqlalchemy import cast, select, update
from sqlalchemy.dialects import postgresql

import datanommer.models as m

from .utils import config_option, get_config


log = logging.getLogger(__name__)


@click.command()
@config_option
@click.option(
"--chunk-size",
default=30,
type=int,
show_default=True,
help="Go through messages these many days at a time (lower is slower but saves memory).",
)
@click.option(
"--debug",
is_flag=True,
help="Show more information.",
)
def main(config_path, chunk_size, debug):
"""Go over old messages and populate the msg_json field."""
config = get_config(config_path)
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO, format="%(message)s")
m.init(
config["datanommer_sqlalchemy_url"],
alembic_ini=config["alembic_ini"],
)

query = select(m.Message).where(m.Message.msg_json.is_(None))
first_message = m.session.scalars(query.order_by(m.Message.timestamp).limit(1)).first()
if first_message is None:
click.echo("No message to populate.")
return

for start_date, end_date in iterate_over_time(
first_message.timestamp, datetime.timedelta(days=chunk_size)
):
log.debug(
"Converting messages between %s and %s",
start_date.date().isoformat(),
end_date.date().isoformat(),
)
# Fill the msg_json column from the contents of the msg_raw column
query = (
update(m.Message)
.where(
m.Message.msg_json.is_(None),
m.Message.timestamp >= start_date,
m.Message.timestamp < end_date,
)
.values(msg_json=cast(m.Message.msg_raw, postgresql.JSONB(none_as_null=True)))
)
result = m.session.execute(query)
m.session.commit()
log.debug("Populated %s rows", result.rowcount)
# Empty the msg_raw column if msg_json is not filled
query = (
update(m.Message)
.where(
m.Message.msg_json.is_not(None),
m.Message.timestamp >= start_date,
m.Message.timestamp < end_date,
)
.values(msg_raw=None)
)
result = m.session.execute(query)
log.debug("Purged %s rows", result.rowcount)


def iterate_over_time(start_at, interval):
intervals = []
start_date = start_at
now = datetime.datetime.now()
while start_date < now:
end_date = start_date + interval
intervals.append((start_date, end_date))
start_date = end_date

total = len(intervals)
with click.progressbar(length=total) as bar:
for start_date, end_date in intervals:
yield start_date, end_date
m.session.commit()
m.session.expunge_all()
bar.update(1)
52 changes: 52 additions & 0 deletions datanommer.commands/datanommer/commands/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging

import click
from fedora_messaging import config as fedora_messaging_config
from fedora_messaging.message import load_message as load_message
from sqlalchemy import func

import datanommer.models as m


# Go trough messages these many at a time
CHUNK_SIZE = 10000
log = logging.getLogger(__name__)


def get_config(config_path=None):
if config_path:
fedora_messaging_config.conf.load_config(config_path)
conf = fedora_messaging_config.conf["consumer_config"]
for key in ("datanommer_sqlalchemy_url", "alembic_ini"):
if key not in conf:
raise click.ClickException(f"{key} not defined in the fedora-messaging config")
return conf


config_option = click.option(
"-c",
"--config",
"config_path",
help="Load this Fedora Messaging config file",
type=click.Path(exists=True, readable=True),
)


def iterate_over_messages(query, chunk_size):
total = m.session.scalar(query.with_only_columns(func.count(m.Message.id)))
if not total:
click.echo("No messages matched.")
return

click.echo(f"Considering {total} message{'s' if total > 1 else ''}")

query = query.order_by(m.Message.timestamp)
with click.progressbar(length=total) as bar:
for chunk in range(int(total / chunk_size) + 1):
offset = chunk * chunk_size
chunk_query = query.limit(chunk_size).offset(offset)
for message in m.session.scalars(chunk_query):
bar.update(1)
yield message
m.session.commit()
m.session.expunge_all()
1 change: 1 addition & 0 deletions datanommer.commands/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ datanommer-dump = "datanommer.commands:dump"
datanommer-stats = "datanommer.commands:stats"
datanommer-latest = "datanommer.commands:latest"
datanommer-extract-users = "datanommer.commands.extract_users:main"
datanommer-populate-json = "datanommer.commands.populate_json:main"


[build-system]
Expand Down
2 changes: 1 addition & 1 deletion datanommer.commands/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def mock_init(mocker):
@pytest.fixture
def mock_config(mocker):
mocker.patch.dict(
datanommer.commands.fedora_messaging_config.conf["consumer_config"],
datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"],
{
"datanommer_sqlalchemy_url": "",
"alembic_ini": None,
Expand Down
10 changes: 6 additions & 4 deletions datanommer.commands/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

def test_get_datanommer_sqlalchemy_url_keyerror(mocker):
mocker.patch.dict(
datanommer.commands.fedora_messaging_config.conf["consumer_config"],
datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"],
{},
clear=True,
)
Expand All @@ -42,9 +42,11 @@ def test_get_datanommer_sqlalchemy_url_config(mocker):
"datanommer_sqlalchemy_url": "",
"alembic_ini": "/some/where",
}
mocker.patch.dict(datanommer.commands.fedora_messaging_config.conf["consumer_config"], conf)
mocker.patch.dict(
datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], conf
)
load_config = mocker.patch(
"datanommer.commands.fedora_messaging_config.conf.load_config",
"datanommer.commands.utils.fedora_messaging_config.conf.load_config",
)
datanommer.commands.get_config("some-path")
load_config.assert_called_with("some-path")
Expand All @@ -53,7 +55,7 @@ def test_get_datanommer_sqlalchemy_url_config(mocker):
def test_create(mocker):
mock_model_init = mocker.patch("datanommer.commands.m.init")
mocker.patch.dict(
datanommer.commands.fedora_messaging_config.conf["consumer_config"],
datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"],
{
"datanommer_sqlalchemy_url": "TESTURL",
"alembic_ini": "/some/where",
Expand Down
3 changes: 2 additions & 1 deletion datanommer.commands/tests/test_extract_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ def test_extract_force_schema(bodhi_message_db, mock_config, mock_init):


def test_extract_invalid_message(bodhi_message_db, mock_config, mock_init):
bodhi_message_db.msg = "this is invalid"
bodhi_message_db.msg_raw = "this is invalid"
bodhi_message_db.msg_json = None
m.session.commit()

runner = CliRunner()
Expand Down
52 changes: 52 additions & 0 deletions datanommer.commands/tests/test_populate_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from unittest.mock import Mock

import pytest
from click.testing import CliRunner

import datanommer.models as m
from datanommer.commands.populate_json import main as populate_json

from .utils import generate_bodhi_update_complete_message


@pytest.fixture
def bodhi_message_db(datanommer_models):
msg = generate_bodhi_update_complete_message()
m.add(msg)
msg_in_db = m.Message.from_msg_id(msg.id)
msg_in_db.msg_raw = msg_in_db.msg_json
msg_in_db.msg_json = None
m.session.commit()
m.session.refresh(msg_in_db)
assert msg_in_db.msg_json is None
return msg_in_db


@pytest.fixture(autouse=True)
def no_expunge(datanommer_models, monkeypatch):
monkeypatch.setattr(m.session, "expunge_all", Mock(name="expunge_all"))
monkeypatch.setattr(m.session, "expunge", Mock(name="expunge"))


def test_populate_json(bodhi_message_db, mock_config, mock_init):
runner = CliRunner()
result = runner.invoke(populate_json)

assert result.exit_code == 0, result.output

m.session.refresh(bodhi_message_db)
print(bodhi_message_db.msg_json)
assert bodhi_message_db.msg_json is not None
assert bodhi_message_db.msg_raw is None
total, _pages, _messages = m.Message.grep(jsons=['$.comment.user.name == "dudemcpants"'])
assert total == 1
assert _messages == [bodhi_message_db]


def test_populate_json_no_message(monkeypatch, mock_config, mock_init):
monkeypatch.setattr(m.session, "execute", Mock(name="execute"))
runner = CliRunner()
result = runner.invoke(populate_json)
assert result.exit_code == 0, result.output
assert result.output == "No message to populate.\n"
m.session.execute.assert_not_called()
Loading