Skip to content

Commit

Permalink
Index metadata fields (#3656)
Browse files Browse the repository at this point in the history
* Index metadata fields

* Index package workflow in ES (#3706)

* fix test

* use json pointer instead of dot path

* index config version id

* index text arrays, always put "text" value

* fix test

* use global logger instead of print

* remove a comment

* tests

* moar tests

* lint
  • Loading branch information
sir-sigurd authored Oct 13, 2023
1 parent 2cb6984 commit bf660b0
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 2 deletions.
98 changes: 96 additions & 2 deletions lambdas/es/indexer/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@
import os
import pathlib
import re
import urllib.parse
from os.path import split
from typing import Optional, Tuple
from urllib.parse import unquote_plus

import boto3
import botocore
import jsonpointer
import nbformat
from dateutil.tz import tzutc
from document_queue import (
Expand Down Expand Up @@ -179,6 +181,9 @@
USER_AGENT_EXTRA = " quilt3-lambdas-es-indexer"


logger = get_quilt_logger()


def now_like_boto3():
"""ensure timezone UTC for consistency with boto3:
Example of what boto3 returns on head_object:
Expand Down Expand Up @@ -299,6 +304,94 @@ def do_index(
logger_.debug("%s indexed as package (%s)", key, event_type)


def _try_parse_date(s: str) -> Optional[datetime.datetime]:
# XXX: do we need to support more formats?
if s[-1:] == "Z":
s = s[:-1]
try:
return datetime.datetime.fromisoformat(s)
except ValueError:
return None


MAX_KEYWORD_LEN = 256


def _get_metadata_fields(path: tuple, d: dict):
for k, raw_value in d.items():
if isinstance(raw_value, dict):
yield from _get_metadata_fields(path + (k,), raw_value)
else:
v = raw_value
if isinstance(v, str):
date = _try_parse_date(v)
if date is not None:
type_ = "date"
v = date
else:
type_ = "keyword" if len(v) <= MAX_KEYWORD_LEN else "text"
elif isinstance(v, bool):
type_ = "boolean"
elif isinstance(v, (int, float)):
# XXX: do something on ints that can't be converted to float without loss?
type_ = "double"
elif isinstance(v, list):
if not (v and all(isinstance(x, str) for x in v)):
continue
type_ = "keyword" if all(len(x) <= MAX_KEYWORD_LEN for x in v) else "text"
else:
logger.warning("ignoring value of type %s", type(v))
continue

yield path + (k,), type_, raw_value, v


def get_metadata_fields(meta):
if not isinstance(meta, dict):
# XXX: can we do something better?
return None
return [
{
"json_pointer": jsonpointer.JsonPointer.from_parts(path).path,
"type": type_,
"text": json.dumps(raw_value, ensure_ascii=False),
type_: value,
}
for path, type_, raw_value, value in _get_metadata_fields((), meta)
]


def _prepare_workflow_for_es(workflow, bucket):
if workflow is None:
return None

try:
config_url = workflow["config"]
if not config_url.startswith(f"s3://{bucket}/.quilt/workflows/config.yml"):
raise Exception(f"Bad workflow config URL {config_url}")

config_url_parsed = urllib.parse.urlparse(config_url)
query = urllib.parse.parse_qs(config_url_parsed.query)
version_id = query.pop('versionId', [None])[0]
if query:
raise Exception(f"Unexpected S3 query string: {config_url_parsed.query!r}")

return {
"config_version_id": version_id, # XXX: how to handle None?
"id": workflow["id"],
"schemas": [
{
"id": k,
"url": v,
}
for k, v in workflow.get("schemas", {}).items()
],
}
except Exception:
logger.exception("Bad workflow object: %s", json.dumps(workflow, indent=2))
return None


def index_if_package(
s3_client,
doc_queue: DocumentQueue,
Expand Down Expand Up @@ -351,7 +444,6 @@ def get_pkg_data():
return

user_meta = first.get("user_meta")
user_meta = json.dumps(user_meta) if user_meta else None

return {
"key": key,
Expand All @@ -363,8 +455,10 @@ def get_pkg_data():
"pointer_file": pointer_file,
"hash": package_hash,
"package_stats": stats,
"metadata": user_meta,
"metadata": json.dumps(user_meta) if user_meta else None,
"metadata_fields": get_metadata_fields(user_meta),
"comment": str(first.get("message", "")),
"workflow": _prepare_workflow_for_es(first.get("workflow"), bucket),
}

data = get_pkg_data() or {}
Expand Down
1 change: 1 addition & 0 deletions lambdas/es/indexer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ importlib-metadata==6.6.0
ipython-genutils==0.2.0
jmespath==0.9.4
jsonschema==3.2.0
jsonpointer==2.4
jupyter-core==4.11.2
lxml==4.9.2
nbformat==5.1.3
Expand Down
180 changes: 180 additions & 0 deletions lambdas/es/indexer/test/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,16 @@ def test_index_if_package(self, append_mock, select_meta_mock, select_stats_mock
"hash": pkg_hash,
"package_stats": select_stats_mock.return_value,
"metadata": json.dumps(meta),
"metadata_fields": [
{
"json_pointer": "/foo",
"type": "keyword",
"keyword": "bar",
"text": '"bar"',
},
],
"comment": message,
"workflow": None,
})

def test_index_if_package_skip(self):
Expand Down Expand Up @@ -1829,3 +1838,174 @@ def test_extract_pptx():
result = index.extract_pptx(buf, len(lorem) * 4 - 1)

assert result == "\n".join([lorem] * 3)


TEXT_VALUE = (index.MAX_KEYWORD_LEN + 1) * "a"
KEYWORD_VALUE = "a"


@pytest.mark.parametrize(
"src_value, expected_field",
[
(
TEXT_VALUE,
{
"type": "text",
"text": TEXT_VALUE,
},
),
(
[TEXT_VALUE, TEXT_VALUE],
{
"type": "text",
"text": [TEXT_VALUE, TEXT_VALUE],
},
),
(
[KEYWORD_VALUE, KEYWORD_VALUE],
{
"type": "keyword",
"keyword": [KEYWORD_VALUE, KEYWORD_VALUE],
"text": json.dumps([KEYWORD_VALUE, KEYWORD_VALUE]),
},
),
(
[KEYWORD_VALUE, TEXT_VALUE],
{
"type": "text",
"text": [KEYWORD_VALUE, TEXT_VALUE],
},
),
(
1,
{
"type": "double",
"text": json.dumps(1),
"double": 1,
},
),
(
1.2,
{
"type": "double",
"text": json.dumps(1.2),
"double": 1.2,
},
),
(
"2023-10-13T09:10:23.873434",
{
"type": "date",
"text": json.dumps("2023-10-13T09:10:23.873434"),
"date": datetime.datetime(2023, 10, 13, 9, 10, 23, 873434),
},
),
(
"2023-10-13T09:10:23.873434Z",
{
"type": "date",
"text": json.dumps("2023-10-13T09:10:23.873434Z"),
"date": datetime.datetime(2023, 10, 13, 9, 10, 23, 873434),
},
),
(
True,
{
"type": "boolean",
"text": json.dumps(True),
"boolean": True,
},
),
],
)
def test_get_metadata_fields_values(src_value, expected_field):
field_name = "a"

assert index.get_metadata_fields(
{
field_name: src_value,
}
) == [
{
"json_pointer": f"/{field_name}",
**expected_field,
}
]


@pytest.mark.parametrize(
"src_value",
[
None,
[1, TEXT_VALUE],
],
)
def test_get_metadata_fields_values_ignored(src_value):
field_name = "a"

assert index.get_metadata_fields(
{
field_name: src_value,
}
) == []


@pytest.mark.parametrize(
"metadata, expected_json_pointer",
[
(
{
"a": TEXT_VALUE,
},
"/a",
),
(
{
"a": {
"a": TEXT_VALUE,
},
},
"/a/a",
),
(
{
"a.a": TEXT_VALUE,
},
"/a.a",
),
(
{
"a/a": TEXT_VALUE,
},
"/a~1a",
),
],
)
def test_get_metadata_fields_json_pointer(metadata, expected_json_pointer):
field, = index.get_metadata_fields(metadata)
assert field["json_pointer"] == expected_json_pointer


def test_prepare_workflow_for_es():
assert index._prepare_workflow_for_es(
{
"config": "s3://BUCKET/.quilt/workflows/config.yml?versionId=asdf",
"id": "workflow-id",
"schemas": {
"schema-id": "schema-url",
},
},
"BUCKET",
) == {
"config_version_id": "asdf",
"id": "workflow-id",
"schemas": [
{
"id": "schema-id",
"url": "schema-url",
}
],
}

0 comments on commit bf660b0

Please sign in to comment.