diff --git a/datanommer.commands/datanommer/commands/__init__.py b/datanommer.commands/datanommer/commands/__init__.py index 481b39bdc..b72fa690e 100644 --- a/datanommer.commands/datanommer/commands/__init__.py +++ b/datanommer.commands/datanommer/commands/__init__.py @@ -21,36 +21,18 @@ from datetime import datetime, timedelta, timezone import click -from fedora_messaging import config as fedora_messaging_config from sqlalchemy import func, select import datanommer.models as m +from .utils import config_option, get_config + __version__ = importlib.metadata.version("datanommer.commands") log = logging.getLogger("datanommer") -def get_config(config_path=None): - if config_path: - fedora_messaging_config.conf.load_config(config_path) - conf = fedora_messaging_config.conf["consumer_config"] - for key in ("datanommer_sqlalchemy_url", "alembic_ini"): - if key not in conf: - raise click.ClickException(f"{key} not defined in the fedora-messaging config") - return conf - - -config_option = click.option( - "-c", - "--config", - "config_path", - help="Load this Fedora Messaging config file", - type=click.Path(exists=True, readable=True), -) - - @click.command() @config_option def create(config_path): diff --git a/datanommer.commands/datanommer/commands/extract_users.py b/datanommer.commands/datanommer/commands/extract_users.py index 63855c89c..238a601ae 100644 --- a/datanommer.commands/datanommer/commands/extract_users.py +++ b/datanommer.commands/datanommer/commands/extract_users.py @@ -4,15 +4,13 @@ import click from fedora_messaging.exceptions import ValidationError from fedora_messaging.message import load_message as load_message -from sqlalchemy import and_, func, not_, select +from sqlalchemy import and_, not_, select import datanommer.models as m -from . import config_option, get_config +from .utils import CHUNK_SIZE, config_option, get_config, iterate_over_messages -# Go trough messages these at a time -CHUNK_SIZE = 10000 log = logging.getLogger(__name__) SKIP_TOPICS = [ @@ -111,32 +109,17 @@ def main(config_path, topic, category, start, end, force_schema, chunk_size, deb isouter=True, ).where(m.users_assoc_table.c.msg_id.is_(None)) - total = m.session.scalar(query.with_only_columns(func.count(m.Message.id))) - if not total: - click.echo("No messages matched.") - return - - click.echo(f"Considering {total} message{'s' if total > 1 else ''}") - - query = query.order_by(m.Message.timestamp) - with click.progressbar(length=total) as bar: - for chunk in range(int(total / chunk_size) + 1): - offset = chunk * chunk_size - chunk_query = query.limit(chunk_size).offset(offset) - for message in m.session.scalars(chunk_query): - bar.update(1) - usernames = get_usernames(message, force_schema=force_schema) - if not usernames: - m.session.expunge(message) - continue - message._insert_list(m.User, m.users_assoc_table, usernames) - if debug: - click.echo( - f"Usernames for message {message.msg_id} of topic {message.topic}" - f": {', '.join(usernames)}" - ) - m.session.commit() - m.session.expunge_all() + for message in iterate_over_messages(query, chunk_size): + usernames = get_usernames(message, force_schema=force_schema) + if not usernames: + m.session.expunge(message) + continue + message._insert_list(m.User, m.users_assoc_table, usernames) + if debug: + click.echo( + f"Usernames for message {message.msg_id} of topic {message.topic}" + f": {', '.join(usernames)}" + ) def get_usernames(db_message, force_schema): diff --git a/datanommer.commands/datanommer/commands/populate_json.py b/datanommer.commands/datanommer/commands/populate_json.py new file mode 100644 index 000000000..ccb90e16d --- /dev/null +++ b/datanommer.commands/datanommer/commands/populate_json.py @@ -0,0 +1,96 @@ +import datetime +import logging + +import click +from fedora_messaging.message import load_message as load_message +from sqlalchemy import cast, select, update +from sqlalchemy.dialects import postgresql + +import datanommer.models as m + +from .utils import config_option, get_config + + +log = logging.getLogger(__name__) + + +@click.command() +@config_option +@click.option( + "--chunk-size", + default=30, + type=int, + show_default=True, + help="Go through messages these many days at a time (lower is slower but saves memory).", +) +@click.option( + "--debug", + is_flag=True, + help="Show more information.", +) +def main(config_path, chunk_size, debug): + """Go over old messages and populate the msg_json field.""" + config = get_config(config_path) + logging.basicConfig(level=logging.DEBUG if debug else logging.INFO, format="%(message)s") + m.init( + config["datanommer_sqlalchemy_url"], + alembic_ini=config["alembic_ini"], + ) + + query = select(m.Message).where(m.Message.msg_json.is_(None)) + first_message = m.session.scalars(query.order_by(m.Message.timestamp).limit(1)).first() + if first_message is None: + click.echo("No message to populate.") + return + + for start_date, end_date in iterate_over_time( + first_message.timestamp, datetime.timedelta(days=chunk_size) + ): + log.debug( + "Converting messages between %s and %s", + start_date.date().isoformat(), + end_date.date().isoformat(), + ) + # Fill the msg_json column from the contents of the msg_raw column + query = ( + update(m.Message) + .where( + m.Message.msg_json.is_(None), + m.Message.timestamp >= start_date, + m.Message.timestamp < end_date, + ) + .values(msg_json=cast(m.Message.msg_raw, postgresql.JSONB(none_as_null=True))) + ) + result = m.session.execute(query) + m.session.commit() + log.debug("Populated %s rows", result.rowcount) + # Empty the msg_raw column if msg_json is not filled + query = ( + update(m.Message) + .where( + m.Message.msg_json.is_not(None), + m.Message.timestamp >= start_date, + m.Message.timestamp < end_date, + ) + .values(msg_raw=None) + ) + result = m.session.execute(query) + log.debug("Purged %s rows", result.rowcount) + + +def iterate_over_time(start_at, interval): + intervals = [] + start_date = start_at + now = datetime.datetime.now() + while start_date < now: + end_date = start_date + interval + intervals.append((start_date, end_date)) + start_date = end_date + + total = len(intervals) + with click.progressbar(length=total) as bar: + for start_date, end_date in intervals: + yield start_date, end_date + m.session.commit() + m.session.expunge_all() + bar.update(1) diff --git a/datanommer.commands/datanommer/commands/utils.py b/datanommer.commands/datanommer/commands/utils.py new file mode 100644 index 000000000..1b623b367 --- /dev/null +++ b/datanommer.commands/datanommer/commands/utils.py @@ -0,0 +1,52 @@ +import logging + +import click +from fedora_messaging import config as fedora_messaging_config +from fedora_messaging.message import load_message as load_message +from sqlalchemy import func + +import datanommer.models as m + + +# Go trough messages these many at a time +CHUNK_SIZE = 10000 +log = logging.getLogger(__name__) + + +def get_config(config_path=None): + if config_path: + fedora_messaging_config.conf.load_config(config_path) + conf = fedora_messaging_config.conf["consumer_config"] + for key in ("datanommer_sqlalchemy_url", "alembic_ini"): + if key not in conf: + raise click.ClickException(f"{key} not defined in the fedora-messaging config") + return conf + + +config_option = click.option( + "-c", + "--config", + "config_path", + help="Load this Fedora Messaging config file", + type=click.Path(exists=True, readable=True), +) + + +def iterate_over_messages(query, chunk_size): + total = m.session.scalar(query.with_only_columns(func.count(m.Message.id))) + if not total: + click.echo("No messages matched.") + return + + click.echo(f"Considering {total} message{'s' if total > 1 else ''}") + + query = query.order_by(m.Message.timestamp) + with click.progressbar(length=total) as bar: + for chunk in range(int(total / chunk_size) + 1): + offset = chunk * chunk_size + chunk_query = query.limit(chunk_size).offset(offset) + for message in m.session.scalars(chunk_query): + bar.update(1) + yield message + m.session.commit() + m.session.expunge_all() diff --git a/datanommer.commands/pyproject.toml b/datanommer.commands/pyproject.toml index f5a6c317d..06ba3b225 100644 --- a/datanommer.commands/pyproject.toml +++ b/datanommer.commands/pyproject.toml @@ -43,6 +43,7 @@ datanommer-dump = "datanommer.commands:dump" datanommer-stats = "datanommer.commands:stats" datanommer-latest = "datanommer.commands:latest" datanommer-extract-users = "datanommer.commands.extract_users:main" +datanommer-populate-json = "datanommer.commands.populate_json:main" [build-system] diff --git a/datanommer.commands/tests/conftest.py b/datanommer.commands/tests/conftest.py index 4c046f604..54f7e7d86 100644 --- a/datanommer.commands/tests/conftest.py +++ b/datanommer.commands/tests/conftest.py @@ -16,7 +16,7 @@ def mock_init(mocker): @pytest.fixture def mock_config(mocker): mocker.patch.dict( - datanommer.commands.fedora_messaging_config.conf["consumer_config"], + datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], { "datanommer_sqlalchemy_url": "", "alembic_ini": None, diff --git a/datanommer.commands/tests/test_commands.py b/datanommer.commands/tests/test_commands.py index 58c55515c..a532d2631 100644 --- a/datanommer.commands/tests/test_commands.py +++ b/datanommer.commands/tests/test_commands.py @@ -29,7 +29,7 @@ def test_get_datanommer_sqlalchemy_url_keyerror(mocker): mocker.patch.dict( - datanommer.commands.fedora_messaging_config.conf["consumer_config"], + datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], {}, clear=True, ) @@ -42,9 +42,11 @@ def test_get_datanommer_sqlalchemy_url_config(mocker): "datanommer_sqlalchemy_url": "", "alembic_ini": "/some/where", } - mocker.patch.dict(datanommer.commands.fedora_messaging_config.conf["consumer_config"], conf) + mocker.patch.dict( + datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], conf + ) load_config = mocker.patch( - "datanommer.commands.fedora_messaging_config.conf.load_config", + "datanommer.commands.utils.fedora_messaging_config.conf.load_config", ) datanommer.commands.get_config("some-path") load_config.assert_called_with("some-path") @@ -53,7 +55,7 @@ def test_get_datanommer_sqlalchemy_url_config(mocker): def test_create(mocker): mock_model_init = mocker.patch("datanommer.commands.m.init") mocker.patch.dict( - datanommer.commands.fedora_messaging_config.conf["consumer_config"], + datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], { "datanommer_sqlalchemy_url": "TESTURL", "alembic_ini": "/some/where", diff --git a/datanommer.commands/tests/test_extract_users.py b/datanommer.commands/tests/test_extract_users.py index a3b4a0482..f5590f57b 100644 --- a/datanommer.commands/tests/test_extract_users.py +++ b/datanommer.commands/tests/test_extract_users.py @@ -171,7 +171,8 @@ def test_extract_force_schema(bodhi_message_db, mock_config, mock_init): def test_extract_invalid_message(bodhi_message_db, mock_config, mock_init): - bodhi_message_db.msg = "this is invalid" + bodhi_message_db.msg_raw = "this is invalid" + bodhi_message_db.msg_json = None m.session.commit() runner = CliRunner() diff --git a/datanommer.commands/tests/test_populate_json.py b/datanommer.commands/tests/test_populate_json.py new file mode 100644 index 000000000..e0559f2f2 --- /dev/null +++ b/datanommer.commands/tests/test_populate_json.py @@ -0,0 +1,52 @@ +from unittest.mock import Mock + +import pytest +from click.testing import CliRunner + +import datanommer.models as m +from datanommer.commands.populate_json import main as populate_json + +from .utils import generate_bodhi_update_complete_message + + +@pytest.fixture +def bodhi_message_db(datanommer_models): + msg = generate_bodhi_update_complete_message() + m.add(msg) + msg_in_db = m.Message.from_msg_id(msg.id) + msg_in_db.msg_raw = msg_in_db.msg_json + msg_in_db.msg_json = None + m.session.commit() + m.session.refresh(msg_in_db) + assert msg_in_db.msg_json is None + return msg_in_db + + +@pytest.fixture(autouse=True) +def no_expunge(datanommer_models, monkeypatch): + monkeypatch.setattr(m.session, "expunge_all", Mock(name="expunge_all")) + monkeypatch.setattr(m.session, "expunge", Mock(name="expunge")) + + +def test_populate_json(bodhi_message_db, mock_config, mock_init): + runner = CliRunner() + result = runner.invoke(populate_json) + + assert result.exit_code == 0, result.output + + m.session.refresh(bodhi_message_db) + print(bodhi_message_db.msg_json) + assert bodhi_message_db.msg_json is not None + assert bodhi_message_db.msg_raw is None + total, _pages, _messages = m.Message.grep(jsons=['$.comment.user.name == "dudemcpants"']) + assert total == 1 + assert _messages == [bodhi_message_db] + + +def test_populate_json_no_message(monkeypatch, mock_config, mock_init): + monkeypatch.setattr(m.session, "execute", Mock(name="execute")) + runner = CliRunner() + result = runner.invoke(populate_json) + assert result.exit_code == 0, result.output + assert result.output == "No message to populate.\n" + m.session.execute.assert_not_called() diff --git a/datanommer.models/datanommer/models/__init__.py b/datanommer.models/datanommer/models/__init__.py index fcf8309f0..1460f5464 100644 --- a/datanommer.models/datanommer/models/__init__.py +++ b/datanommer.models/datanommer/models/__init__.py @@ -25,6 +25,7 @@ from sqlalchemy import ( and_, between, + cast, Column, create_engine, DateTime, @@ -32,6 +33,7 @@ event, ForeignKey, func, + Index, Integer, not_, or_, @@ -156,7 +158,7 @@ def add(message): msg_id=message.id, topic=message.topic, timestamp=sent_at, - msg=message.body, + msg_json=message.body, headers=headers, users=usernames, packages=packages, @@ -213,7 +215,15 @@ def coerce_compared_value(self, op, value): class Message(DeclarativeBase): __tablename__ = "messages" - __table_args__ = (UniqueConstraint("msg_id", "timestamp"),) + __table_args__ = ( + UniqueConstraint("msg_id", "timestamp"), + Index( + "json_root", + "msg_json", + postgresql_using="gin", + postgresql_ops={"msg_json": "jsonb_path_ops"}, + ), + ) id = Column(Integer, primary_key=True, autoincrement=True) msg_id = Column(Unicode, nullable=True, default=None, index=True) @@ -227,7 +237,8 @@ class Message(DeclarativeBase): crypto = Column(UnicodeText) source_name = Column(Unicode, default="datanommer") source_version = Column(Unicode, default=lambda context: __version__) - msg = Column(_JSONEncodedDict, nullable=False) + msg_raw = Column(_JSONEncodedDict, nullable=True) + msg_json = Column(postgresql.JSONB(none_as_null=True), nullable=True) headers = Column(postgresql.JSONB(none_as_null=True)) users = relationship( "User", @@ -248,6 +259,10 @@ class Message(DeclarativeBase): ), ) + @property + def msg(self): + return self.msg_json if self.msg_json is not None else self.msg_raw + @validates("topic") def get_category(self, key, topic): """Update the category when the topic is set. @@ -373,6 +388,8 @@ def make_query( topics=None, not_topics=None, contains=None, + jsons=None, + jsons_and=None, ): """Flexible query interface for messages. @@ -400,6 +417,16 @@ def make_query( (user == 'ralph') AND NOT (category == 'bodhi' OR category == 'wiki') + + ---- + + The ``jsons`` argument is a list of jsonpath filters, please refer to + `PostgreSQL's documentation + `_ + on the matter to learn how to build the jsonpath expression. + + The ``jsons_and`` argument is similar to the ``jsons`` argument, but all + the values must match for a message to be returned. """ users = users or [] @@ -411,6 +438,8 @@ def make_query( topics = topics or [] not_topics = not_topics or [] contains = contains or [] + jsons = jsons or [] + jsons_and = jsons_and or [] Message = cls query = select(Message) @@ -442,7 +471,19 @@ def make_query( query = query.where(or_(*(Message.topic == topic for topic in topics))) if contains: - query = query.where(or_(*(Message.msg.like(f"%{contain}%") for contain in contains))) + query = query.where( + or_( + *( + cast(Message.msg_json, UnicodeText).like(f"%{contain}%") + for contain in contains + ) + ) + ) + + if jsons: + query = query.where(or_(*(Message.msg_json.path_match(j) for j in jsons))) + if jsons_and: + query = query.where(and_(*(Message.msg_json.path_match(j) for j in jsons_and))) # And then the four negative filters as necessary if not_users: @@ -507,6 +548,11 @@ def grep( The ``jsons_and`` argument is similar to the ``jsons`` argument, but all the values must match for a message to be returned. + + ---- + + If the `defer` argument evaluates to True, the query won't actually + be executed, but a SQLAlchemy query object returned instead. """ query = cls.make_query(**kwargs) # Finally, tag on our pagination arguments diff --git a/datanommer.models/datanommer/models/alembic/versions/0ce25211a227_message_msg_raw.py b/datanommer.models/datanommer/models/alembic/versions/0ce25211a227_message_msg_raw.py new file mode 100644 index 000000000..a8d3d1497 --- /dev/null +++ b/datanommer.models/datanommer/models/alembic/versions/0ce25211a227_message_msg_raw.py @@ -0,0 +1,25 @@ +"""Rename Message.msg to Message.msg_raw. + +This is backwards-incompatible. + +Revision ID: 0ce25211a227 +Revises: f6d590f5c53f +Create Date: 2024-05-16 14:47:47.180323 + +""" + +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "0ce25211a227" +down_revision = "f6d590f5c53f" + + +def upgrade(): + # Rename the msg column to msg_raw + op.alter_column("messages", "msg", new_column_name="msg_raw") + + +def downgrade(): + op.alter_column("messages", "msg_raw", new_column_name="msg") diff --git a/datanommer.models/datanommer/models/alembic/versions/f6918385051f_messages_headers_index.py b/datanommer.models/datanommer/models/alembic/versions/f6918385051f_messages_headers_index.py new file mode 100644 index 000000000..2b5972a1c --- /dev/null +++ b/datanommer.models/datanommer/models/alembic/versions/f6918385051f_messages_headers_index.py @@ -0,0 +1,29 @@ +"""Messages.headers index + +Revision ID: f6918385051f +Revises: 951c40020acc +Create Date: 2024-05-07 16:05:05.344863 + +""" + +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "f6918385051f" +down_revision = "951c40020acc" + + +def upgrade(): + op.create_index( + "ix_messages_headers", + "messages", + ["headers"], + unique=False, + postgresql_using="gin", + postgresql_ops={"msg_json": "jsonb_path_ops"}, + ) + + +def downgrade(): + op.drop_index("ix_messages_headers", table_name="messages", postgresql_using="gin") diff --git a/datanommer.models/datanommer/models/alembic/versions/f6d590f5c53f_message_msg_json.py b/datanommer.models/datanommer/models/alembic/versions/f6d590f5c53f_message_msg_json.py new file mode 100644 index 000000000..40d3d53d0 --- /dev/null +++ b/datanommer.models/datanommer/models/alembic/versions/f6d590f5c53f_message_msg_json.py @@ -0,0 +1,41 @@ +"""Create the Message.msg_json column and index it. + +Revision ID: f6d590f5c53f +Revises: f6918385051f +Create Date: 2024-05-02 14:38:38.399397 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision = "f6d590f5c53f" +down_revision = "f6918385051f" + + +def upgrade(): + op.add_column( + "messages", + sa.Column( + "msg_json", postgresql.JSONB(none_as_null=True, astext_type=sa.Text()), nullable=True + ), + ) + op.create_index( + "ix_messages_msg_json_root", + "messages", + ["msg_json"], + unique=False, + postgresql_using="gin", + postgresql_ops={"msg_json": "jsonb_path_ops"}, + ) + # Make the msg column nullable so that the populate-json script can run + op.alter_column("messages", "msg", nullable=True) + + +def downgrade(): + op.alter_column("messages", "msg", nullable=False) + op.drop_index("ix_messages_msg_json_root", table_name="messages", postgresql_using="gin") + op.drop_column("messages", "msg_json") diff --git a/datanommer.models/news/PR1312.feature b/datanommer.models/news/PR1312.feature new file mode 100644 index 000000000..d7a4fc74f --- /dev/null +++ b/datanommer.models/news/PR1312.feature @@ -0,0 +1 @@ +Add a JSONB column to the database to make jsonpath queries \ No newline at end of file diff --git a/datanommer.models/tests/test_model.py b/datanommer.models/tests/test_model.py index 786a588e8..f93fac797 100644 --- a/datanommer.models/tests/test_model.py +++ b/datanommer.models/tests/test_model.py @@ -428,6 +428,44 @@ def test_grep_contains(datanommer_models): assert r[0].msg == example_message.body +def test_grep_jsons(datanommer_models): + example_message = generate_bodhi_update_complete_message() + dm.add(example_message) + dm.session.flush() + t, p, r = dm.Message.grep(jsons=['$.comment.update.status == "pending"']) + assert t == 1 + assert p == 1 + assert len(r) == 1 + assert r[0].msg == example_message.body + + +def test_grep_jsons_and_all_match(datanommer_models): + example_message = generate_bodhi_update_complete_message() + dm.add(example_message) + dm.session.flush() + t, p, r = dm.Message.grep( + jsons_and=['$.comment.update.status == "pending"', '$.comment.user.name == "dudemcpants"'] + ) + assert t == 1 + assert p == 1 + assert len(r) == 1 + assert r[0].msg == example_message.body + + +def test_grep_jsons_and_one_match(datanommer_models): + example_message = generate_bodhi_update_complete_message() + dm.add(example_message) + dm.session.flush() + t, p, r = dm.Message.grep( + jsons_and=[ + '$.comment.update.status == "pending"', + '$.comment.user.name == "does-not-match"', + ] + ) + assert t == 0 + assert len(r) == 0 + + def test_grep_rows_per_page(datanommer_models, add_200_messages): total, pages, messages = dm.Message.grep() assert total == 200