From 79752c380403f4f90bcc7319bf6158ec243b1564 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Bompard?= Date: Thu, 2 May 2024 17:29:56 +0200 Subject: [PATCH] Add a JSONB column to the database to make jsonpath queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Aurélien Bompard --- .../datanommer/commands/__init__.py | 22 +---- .../datanommer/commands/extract_users.py | 43 +++------ .../datanommer/commands/populate_json.py | 91 +++++++++++++++++++ .../datanommer/commands/utils.py | 52 +++++++++++ datanommer.commands/pyproject.toml | 1 + datanommer.commands/tests/conftest.py | 2 +- datanommer.commands/tests/test_commands.py | 10 +- .../tests/test_populate_json.py | 43 +++++++++ .../datanommer/models/__init__.py | 45 ++++++++- .../f6918385051f_messages_headers_index.py | 29 ++++++ .../versions/f6d590f5c53f_message_msg_json.py | 43 +++++++++ datanommer.models/news/PR1312.feature | 1 + datanommer.models/tests/test_model.py | 38 ++++++++ 13 files changed, 360 insertions(+), 60 deletions(-) create mode 100644 datanommer.commands/datanommer/commands/populate_json.py create mode 100644 datanommer.commands/datanommer/commands/utils.py create mode 100644 datanommer.commands/tests/test_populate_json.py create mode 100644 datanommer.models/datanommer/models/alembic/versions/f6918385051f_messages_headers_index.py create mode 100644 datanommer.models/datanommer/models/alembic/versions/f6d590f5c53f_message_msg_json.py create mode 100644 datanommer.models/news/PR1312.feature diff --git a/datanommer.commands/datanommer/commands/__init__.py b/datanommer.commands/datanommer/commands/__init__.py index 481b39bdc..b72fa690e 100644 --- a/datanommer.commands/datanommer/commands/__init__.py +++ b/datanommer.commands/datanommer/commands/__init__.py @@ -21,36 +21,18 @@ from datetime import datetime, timedelta, timezone import click -from fedora_messaging import config as fedora_messaging_config from sqlalchemy import func, select import datanommer.models as m +from .utils import config_option, get_config + __version__ = importlib.metadata.version("datanommer.commands") log = logging.getLogger("datanommer") -def get_config(config_path=None): - if config_path: - fedora_messaging_config.conf.load_config(config_path) - conf = fedora_messaging_config.conf["consumer_config"] - for key in ("datanommer_sqlalchemy_url", "alembic_ini"): - if key not in conf: - raise click.ClickException(f"{key} not defined in the fedora-messaging config") - return conf - - -config_option = click.option( - "-c", - "--config", - "config_path", - help="Load this Fedora Messaging config file", - type=click.Path(exists=True, readable=True), -) - - @click.command() @config_option def create(config_path): diff --git a/datanommer.commands/datanommer/commands/extract_users.py b/datanommer.commands/datanommer/commands/extract_users.py index 63855c89c..238a601ae 100644 --- a/datanommer.commands/datanommer/commands/extract_users.py +++ b/datanommer.commands/datanommer/commands/extract_users.py @@ -4,15 +4,13 @@ import click from fedora_messaging.exceptions import ValidationError from fedora_messaging.message import load_message as load_message -from sqlalchemy import and_, func, not_, select +from sqlalchemy import and_, not_, select import datanommer.models as m -from . import config_option, get_config +from .utils import CHUNK_SIZE, config_option, get_config, iterate_over_messages -# Go trough messages these at a time -CHUNK_SIZE = 10000 log = logging.getLogger(__name__) SKIP_TOPICS = [ @@ -111,32 +109,17 @@ def main(config_path, topic, category, start, end, force_schema, chunk_size, deb isouter=True, ).where(m.users_assoc_table.c.msg_id.is_(None)) - total = m.session.scalar(query.with_only_columns(func.count(m.Message.id))) - if not total: - click.echo("No messages matched.") - return - - click.echo(f"Considering {total} message{'s' if total > 1 else ''}") - - query = query.order_by(m.Message.timestamp) - with click.progressbar(length=total) as bar: - for chunk in range(int(total / chunk_size) + 1): - offset = chunk * chunk_size - chunk_query = query.limit(chunk_size).offset(offset) - for message in m.session.scalars(chunk_query): - bar.update(1) - usernames = get_usernames(message, force_schema=force_schema) - if not usernames: - m.session.expunge(message) - continue - message._insert_list(m.User, m.users_assoc_table, usernames) - if debug: - click.echo( - f"Usernames for message {message.msg_id} of topic {message.topic}" - f": {', '.join(usernames)}" - ) - m.session.commit() - m.session.expunge_all() + for message in iterate_over_messages(query, chunk_size): + usernames = get_usernames(message, force_schema=force_schema) + if not usernames: + m.session.expunge(message) + continue + message._insert_list(m.User, m.users_assoc_table, usernames) + if debug: + click.echo( + f"Usernames for message {message.msg_id} of topic {message.topic}" + f": {', '.join(usernames)}" + ) def get_usernames(db_message, force_schema): diff --git a/datanommer.commands/datanommer/commands/populate_json.py b/datanommer.commands/datanommer/commands/populate_json.py new file mode 100644 index 000000000..1ef3e23e9 --- /dev/null +++ b/datanommer.commands/datanommer/commands/populate_json.py @@ -0,0 +1,91 @@ +import datetime +import logging + +import click +from fedora_messaging.message import load_message as load_message +from sqlalchemy import cast, select, update +from sqlalchemy.dialects import postgresql + +import datanommer.models as m + +from .utils import config_option, get_config + + +log = logging.getLogger(__name__) + + +@click.command() +@config_option +@click.option( + "--chunk-size", + default=30, + type=int, + show_default=True, + help="Go through messages these many days at a time (lower is slower but saves memory).", +) +@click.option( + "--debug", + is_flag=True, + help="Show more information.", +) +def main(config_path, chunk_size, debug): + """Go over old messages and populate the msg_json field.""" + config = get_config(config_path) + m.init( + config["datanommer_sqlalchemy_url"], + alembic_ini=config["alembic_ini"], + ) + + query = select(m.Message).where(m.Message.msg_json.is_(None)) + first_message = m.session.scalars(query.order_by(m.Message.timestamp).limit(1)).first() + for start_date, end_date in iterate_over_time( + first_message.timestamp, datetime.timedelta(days=chunk_size) + ): + log.debug( + "Converting messages between %s and %s", + start_date.date().isoformat(), + end_date.date().isoformat(), + ) + # Fill the msg_json column from the contents of the msg_raw column + query = ( + update(m.Message) + .where( + m.Message.msg_json.is_(None), + m.Message.timestamp >= start_date, + m.Message.timestamp < end_date, + ) + .values(msg_json=cast(m.Message.msg_raw, postgresql.JSONB(none_as_null=True))) + ) + result = m.session.execute(query) + m.session.commit() + log.debug("Populated %s rows", result.rowcount) + # Empty the msg_raw column if msg_json is not filled + query = ( + update(m.Message) + .where( + m.Message.msg_json.is_not(None), + m.Message.timestamp >= start_date, + m.Message.timestamp < end_date, + ) + .values(msg_raw=None) + ) + result = m.session.execute(query) + log.debug("Purged %s rows", result.rowcount) + + +def iterate_over_time(start_at, interval): + intervals = [] + start_date = start_at + now = datetime.datetime.now() + while start_date < now: + end_date = start_date + interval + intervals.append((start_date, end_date)) + start_date = end_date + + total = len(intervals) + with click.progressbar(length=total) as bar: + for start_date, end_date in intervals: + yield start_date, end_date + m.session.commit() + m.session.expunge_all() + bar.update(1) diff --git a/datanommer.commands/datanommer/commands/utils.py b/datanommer.commands/datanommer/commands/utils.py new file mode 100644 index 000000000..1b623b367 --- /dev/null +++ b/datanommer.commands/datanommer/commands/utils.py @@ -0,0 +1,52 @@ +import logging + +import click +from fedora_messaging import config as fedora_messaging_config +from fedora_messaging.message import load_message as load_message +from sqlalchemy import func + +import datanommer.models as m + + +# Go trough messages these many at a time +CHUNK_SIZE = 10000 +log = logging.getLogger(__name__) + + +def get_config(config_path=None): + if config_path: + fedora_messaging_config.conf.load_config(config_path) + conf = fedora_messaging_config.conf["consumer_config"] + for key in ("datanommer_sqlalchemy_url", "alembic_ini"): + if key not in conf: + raise click.ClickException(f"{key} not defined in the fedora-messaging config") + return conf + + +config_option = click.option( + "-c", + "--config", + "config_path", + help="Load this Fedora Messaging config file", + type=click.Path(exists=True, readable=True), +) + + +def iterate_over_messages(query, chunk_size): + total = m.session.scalar(query.with_only_columns(func.count(m.Message.id))) + if not total: + click.echo("No messages matched.") + return + + click.echo(f"Considering {total} message{'s' if total > 1 else ''}") + + query = query.order_by(m.Message.timestamp) + with click.progressbar(length=total) as bar: + for chunk in range(int(total / chunk_size) + 1): + offset = chunk * chunk_size + chunk_query = query.limit(chunk_size).offset(offset) + for message in m.session.scalars(chunk_query): + bar.update(1) + yield message + m.session.commit() + m.session.expunge_all() diff --git a/datanommer.commands/pyproject.toml b/datanommer.commands/pyproject.toml index f5a6c317d..06ba3b225 100644 --- a/datanommer.commands/pyproject.toml +++ b/datanommer.commands/pyproject.toml @@ -43,6 +43,7 @@ datanommer-dump = "datanommer.commands:dump" datanommer-stats = "datanommer.commands:stats" datanommer-latest = "datanommer.commands:latest" datanommer-extract-users = "datanommer.commands.extract_users:main" +datanommer-populate-json = "datanommer.commands.populate_json:main" [build-system] diff --git a/datanommer.commands/tests/conftest.py b/datanommer.commands/tests/conftest.py index 4c046f604..54f7e7d86 100644 --- a/datanommer.commands/tests/conftest.py +++ b/datanommer.commands/tests/conftest.py @@ -16,7 +16,7 @@ def mock_init(mocker): @pytest.fixture def mock_config(mocker): mocker.patch.dict( - datanommer.commands.fedora_messaging_config.conf["consumer_config"], + datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], { "datanommer_sqlalchemy_url": "", "alembic_ini": None, diff --git a/datanommer.commands/tests/test_commands.py b/datanommer.commands/tests/test_commands.py index 58c55515c..a532d2631 100644 --- a/datanommer.commands/tests/test_commands.py +++ b/datanommer.commands/tests/test_commands.py @@ -29,7 +29,7 @@ def test_get_datanommer_sqlalchemy_url_keyerror(mocker): mocker.patch.dict( - datanommer.commands.fedora_messaging_config.conf["consumer_config"], + datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], {}, clear=True, ) @@ -42,9 +42,11 @@ def test_get_datanommer_sqlalchemy_url_config(mocker): "datanommer_sqlalchemy_url": "", "alembic_ini": "/some/where", } - mocker.patch.dict(datanommer.commands.fedora_messaging_config.conf["consumer_config"], conf) + mocker.patch.dict( + datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], conf + ) load_config = mocker.patch( - "datanommer.commands.fedora_messaging_config.conf.load_config", + "datanommer.commands.utils.fedora_messaging_config.conf.load_config", ) datanommer.commands.get_config("some-path") load_config.assert_called_with("some-path") @@ -53,7 +55,7 @@ def test_get_datanommer_sqlalchemy_url_config(mocker): def test_create(mocker): mock_model_init = mocker.patch("datanommer.commands.m.init") mocker.patch.dict( - datanommer.commands.fedora_messaging_config.conf["consumer_config"], + datanommer.commands.utils.fedora_messaging_config.conf["consumer_config"], { "datanommer_sqlalchemy_url": "TESTURL", "alembic_ini": "/some/where", diff --git a/datanommer.commands/tests/test_populate_json.py b/datanommer.commands/tests/test_populate_json.py new file mode 100644 index 000000000..e9f3f5029 --- /dev/null +++ b/datanommer.commands/tests/test_populate_json.py @@ -0,0 +1,43 @@ +from unittest.mock import Mock + +import pytest +from click.testing import CliRunner + +import datanommer.models as m +from datanommer.commands.populate_json import main as populate_json + +from .utils import generate_bodhi_update_complete_message + + +@pytest.fixture +def bodhi_message_db(datanommer_models): + msg = generate_bodhi_update_complete_message() + m.add(msg) + msg_in_db = m.Message.from_msg_id(msg.id) + msg_in_db.msg_raw = msg_in_db.msg_json + msg_in_db.msg_json = None + m.session.commit() + m.session.refresh(msg_in_db) + assert msg_in_db.msg_json is None + return msg_in_db + + +@pytest.fixture(autouse=True) +def no_expunge(datanommer_models, monkeypatch): + monkeypatch.setattr(m.session, "expunge_all", Mock(name="expunge_all")) + monkeypatch.setattr(m.session, "expunge", Mock(name="expunge")) + + +def test_populate_json(bodhi_message_db, mock_config, mock_init): + runner = CliRunner() + result = runner.invoke(populate_json) + + assert result.exit_code == 0, result.output + + m.session.refresh(bodhi_message_db) + print(bodhi_message_db.msg_json) + assert bodhi_message_db.msg_json is not None + assert bodhi_message_db.msg_raw is None + total, _pages, _messages = m.Message.grep(jsons=['$.comment.user.name == "dudemcpants"']) + assert total == 1 + assert _messages == [bodhi_message_db] diff --git a/datanommer.models/datanommer/models/__init__.py b/datanommer.models/datanommer/models/__init__.py index 7dfc17c12..4520e77b3 100644 --- a/datanommer.models/datanommer/models/__init__.py +++ b/datanommer.models/datanommer/models/__init__.py @@ -32,6 +32,7 @@ event, ForeignKey, func, + Index, Integer, not_, or_, @@ -156,7 +157,7 @@ def add(message): msg_id=message.id, topic=message.topic, timestamp=sent_at, - msg=message.body, + msg_json=message.body, headers=headers, users=usernames, packages=packages, @@ -213,7 +214,15 @@ def coerce_compared_value(self, op, value): class Message(DeclarativeBase): __tablename__ = "messages" - __table_args__ = (UniqueConstraint("msg_id", "timestamp"),) + __table_args__ = ( + UniqueConstraint("msg_id", "timestamp"), + Index( + "json_root", + "msg_json", + postgresql_using="gin", + postgresql_ops={"msg_json": "jsonb_path_ops"}, + ), + ) id = Column(Integer, primary_key=True, autoincrement=True) msg_id = Column(Unicode, nullable=True, default=None, index=True) @@ -227,7 +236,8 @@ class Message(DeclarativeBase): crypto = Column(UnicodeText) source_name = Column(Unicode, default="datanommer") source_version = Column(Unicode, default=lambda context: __version__) - msg = Column(_JSONEncodedDict, nullable=False) + msg_raw = Column(_JSONEncodedDict, nullable=True) + msg_json = Column(postgresql.JSONB(none_as_null=True), nullable=True) headers = Column(postgresql.JSONB(none_as_null=True)) users = relationship( "User", @@ -248,6 +258,10 @@ class Message(DeclarativeBase): ), ) + @property + def msg(self): + return self.msg_json if self.msg_json is not None else self.msg_raw + @validates("topic") def get_category(self, key, topic): """Update the category when the topic is set. @@ -376,6 +390,8 @@ def grep( topics=None, not_topics=None, contains=None, + jsons=None, + jsons_and=None, defer=False, ): """Flexible query interface for messages. @@ -407,7 +423,17 @@ def grep( ---- - If the `defer` argument evaluates to True, the query won't actually + The ``jsons`` argument is a list of jsonpath filters, please refer to + `PostgreSQL's documentation + `_ + on the matter to learn how to build the jsonpath expression. + + The ``jsons_and`` argument is similar to the ``jsons`` argument, but all + the values must match for a message to be returned. + + ---- + + If the ``defer`` argument evaluates to True, the query won't actually be executed, but a SQLAlchemy query object returned instead. """ @@ -420,6 +446,8 @@ def grep( topics = topics or [] not_topics = not_topics or [] contains = contains or [] + jsons = jsons or [] + jsons_and = jsons_and or [] Message = cls query = select(Message) @@ -451,7 +479,14 @@ def grep( query = query.where(or_(*(Message.topic == topic for topic in topics))) if contains: - query = query.where(or_(*(Message.msg.like(f"%{contain}%") for contain in contains))) + query = query.where( + or_(*(Message.msg_json.as_string().like(f"%{contain}%") for contain in contains)) + ) + + if jsons: + query = query.where(or_(*(Message.msg_json.path_match(j) for j in jsons))) + if jsons_and: + query = query.where(and_(*(Message.msg_json.path_match(j) for j in jsons_and))) # And then the four negative filters as necessary if not_users: diff --git a/datanommer.models/datanommer/models/alembic/versions/f6918385051f_messages_headers_index.py b/datanommer.models/datanommer/models/alembic/versions/f6918385051f_messages_headers_index.py new file mode 100644 index 000000000..2b5972a1c --- /dev/null +++ b/datanommer.models/datanommer/models/alembic/versions/f6918385051f_messages_headers_index.py @@ -0,0 +1,29 @@ +"""Messages.headers index + +Revision ID: f6918385051f +Revises: 951c40020acc +Create Date: 2024-05-07 16:05:05.344863 + +""" + +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "f6918385051f" +down_revision = "951c40020acc" + + +def upgrade(): + op.create_index( + "ix_messages_headers", + "messages", + ["headers"], + unique=False, + postgresql_using="gin", + postgresql_ops={"msg_json": "jsonb_path_ops"}, + ) + + +def downgrade(): + op.drop_index("ix_messages_headers", table_name="messages", postgresql_using="gin") diff --git a/datanommer.models/datanommer/models/alembic/versions/f6d590f5c53f_message_msg_json.py b/datanommer.models/datanommer/models/alembic/versions/f6d590f5c53f_message_msg_json.py new file mode 100644 index 000000000..9220618a0 --- /dev/null +++ b/datanommer.models/datanommer/models/alembic/versions/f6d590f5c53f_message_msg_json.py @@ -0,0 +1,43 @@ +"""Message.msg_json + +Revision ID: f6d590f5c53f +Revises: f6918385051f +Create Date: 2024-05-02 14:38:38.399397 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision = "f6d590f5c53f" +down_revision = "f6918385051f" + + +def upgrade(): + op.add_column( + "messages", + sa.Column( + "msg_json", postgresql.JSONB(none_as_null=True, astext_type=sa.Text()), nullable=True + ), + ) + op.create_index( + "ix_messages_msg_json_root", + "messages", + ["msg_json"], + unique=False, + postgresql_using="gin", + postgresql_ops={"msg_json": "jsonb_path_ops"}, + ) + # Rename the msg column to msg_text and make it nullable + op.alter_column("messages", "msg", new_column_name="msg_raw") + op.alter_column("messages", "msg_raw", nullable=True) + + +def downgrade(): + op.alter_column("messages", "msg_raw", nullable=False) + op.alter_column("messages", "msg_raw", new_column_name="msg") + op.drop_index("ix_messages_msg_json_root", table_name="messages", postgresql_using="gin") + op.drop_column("messages", "msg_json") diff --git a/datanommer.models/news/PR1312.feature b/datanommer.models/news/PR1312.feature new file mode 100644 index 000000000..d7a4fc74f --- /dev/null +++ b/datanommer.models/news/PR1312.feature @@ -0,0 +1 @@ +Add a JSONB column to the database to make jsonpath queries \ No newline at end of file diff --git a/datanommer.models/tests/test_model.py b/datanommer.models/tests/test_model.py index d3b45174f..7d8a91065 100644 --- a/datanommer.models/tests/test_model.py +++ b/datanommer.models/tests/test_model.py @@ -419,6 +419,44 @@ def test_grep_contains(datanommer_models): assert r[0].msg == example_message.body +def test_grep_jsons(datanommer_models): + example_message = generate_bodhi_update_complete_message() + dm.add(example_message) + dm.session.flush() + t, p, r = dm.Message.grep(jsons=['$.comment.update.status == "pending"']) + assert t == 1 + assert p == 1 + assert len(r) == 1 + assert r[0].msg == example_message.body + + +def test_grep_jsons_and_all_match(datanommer_models): + example_message = generate_bodhi_update_complete_message() + dm.add(example_message) + dm.session.flush() + t, p, r = dm.Message.grep( + jsons_and=['$.comment.update.status == "pending"', '$.comment.user.name == "dudemcpants"'] + ) + assert t == 1 + assert p == 1 + assert len(r) == 1 + assert r[0].msg == example_message.body + + +def test_grep_jsons_and_one_match(datanommer_models): + example_message = generate_bodhi_update_complete_message() + dm.add(example_message) + dm.session.flush() + t, p, r = dm.Message.grep( + jsons_and=[ + '$.comment.update.status == "pending"', + '$.comment.user.name == "does-not-match"', + ] + ) + assert t == 0 + assert len(r) == 0 + + def test_grep_rows_per_page_none(datanommer_models): for x in range(0, 200): example_message = generate_message()