Skip to content

Commit

Permalink
Merge pull request from GHSA-7fgc-89cx-w8j5
Browse files Browse the repository at this point in the history
Sanitize indexes in tuple keys when flattening dicts
  • Loading branch information
amercader authored Dec 13, 2023
2 parents 4974db6 + 9393c53 commit bd02018
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 2 deletions.
33 changes: 31 additions & 2 deletions ckan/logic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from ckan.types import (
Action, ChainedAction,
ChainedAuthFunction, DataDict, ErrorDict, Context, FlattenDataDict,
Schema, Validator, ValidatorFactory)
FlattenKey, Schema, Validator, ValidatorFactory
)

Decorated = TypeVar("Decorated")

Expand Down Expand Up @@ -255,7 +256,35 @@ def tuplize_dict(data_dict: dict[str, Any]) -> FlattenDataDict:
except ValueError:
raise df.DataError('Bad key')
tuplized_dict[tuple(key_list)] = value
return tuplized_dict

# Sanitize key indexes to make sure they are sequential
seq_tuplized_dict: FlattenDataDict = {}
# sequential field indexes grouped by common prefix
groups: dict[FlattenKey, dict[FlattenKey, int]] = defaultdict(dict)
for key in sorted(tuplized_dict.keys()):
new_key = key

# iterate over even(numeric) parts of the key
for idx in range(1, len(key), 2):
# narrow down scope by common prefix
group = groups[key[:idx]]

# if the identifier(i.e `(extra, 123)`, `(resource, 9)`) is met for
# the first time, generate for it next number in the index
# sequence. Index of the latest added item is always equals to the
# number of unique identifiers minus one(because list indexation
# starts from 0 in Python). If identifier already present(i.e, we
# process `(extra, 10, VALUE)` after processing `(extra, 10,
# KEY)`), reuse sequential index of this identifier
seq_index = group.setdefault(key[idx-1:idx+1], len(group))

# replace the currently processed key segment with computed
# sequential index
new_key = new_key[:idx] + (seq_index,) + new_key[idx+1:]

seq_tuplized_dict[new_key] = tuplized_dict[key]

return seq_tuplized_dict


def untuplize_dict(tuplized_dict: FlattenDataDict) -> dict[str, Any]:
Expand Down
86 changes: 86 additions & 0 deletions ckan/tests/logic/test_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from unittest import mock
import pytest
from ckan import logic, model
import ckan.lib.navl.dictization_functions as df

from ckan.types import Context
import ckan.tests.factories as factories

Expand Down Expand Up @@ -100,3 +102,87 @@ def test_check_access_auth_user_for_different_objects():
with pytest.raises(logic.NotAuthorized):
for dataset in dataset3:
logic.check_access("package_show", context, {'id': dataset["id"]})


def test_tuplize_dict():

data_dict = {
"author": "Test Author",
"extras__0__key": "extra1",
"extras__0__value": "value1",
"extras__1__key": "extra2",
"extras__1__value": "value2",
"extras__2__key": "extra3",
"extras__2__value": "value3",
"extras__3__key": "",
"extras__3__value": "",
"groups__0__id": "5a65eae8-ef2b-4a85-8022-d9e5a71ad074",
"name": "test-title",
"notes": "Test desc",
"owner_org": "5a65eae8-ef2b-4a85-8022-d9e5a71ad074",
"private": "True",
"tag_string": "economy,climate",
"title": "Test title",
}

expected = {
("author",): "Test Author",
("extras", 0, "key"): "extra1",
("extras", 0, "value"): "value1",
("extras", 1, "key"): "extra2",
("extras", 1, "value"): "value2",
("extras", 2, "key"): "extra3",
("extras", 2, "value"): "value3",
("extras", 3, "key"): "",
("extras", 3, "value"): "",
("groups", 0, "id"): "5a65eae8-ef2b-4a85-8022-d9e5a71ad074",
("name",): "test-title",
("notes",): "Test desc",
("owner_org",): "5a65eae8-ef2b-4a85-8022-d9e5a71ad074",
("private",): "True",
("tag_string",): "economy,climate",
("title",): "Test title",
}

assert logic.tuplize_dict(data_dict) == expected


def test_tuplize_dict_random_indexes():

data_dict = {
"extras__22__key": "extra2",
"extras__22__value": "value2",
"extras__1__key": "extra1",
"extras__1__value": "value1",
"extras__245566546__key": "extra3",
"extras__245566546__value": "value3",
"groups__13__id": "group2",
"groups__1__id": "group1",
"groups__13__nested__7__name": "latter",
"groups__13__nested__2__name": "former",

}

expected = {
("extras", 0, "key"): "extra1",
("extras", 0, "value"): "value1",
("extras", 1, "key"): "extra2",
("extras", 1, "value"): "value2",
("extras", 2, "key"): "extra3",
("extras", 2, "value"): "value3",
("groups", 0, "id"): "group1",
("groups", 1, "id"): "group2",
("groups", 1, "nested", 0, "name"): "former",
("groups", 1, "nested", 1, "name"): "latter",
}

assert logic.tuplize_dict(data_dict) == expected


def test_tuplize_dict_wrong_index():

with pytest.raises(df.DataError):
data_dict = {
"extras__2a__key": "extra",
}
logic.tuplize_dict(data_dict)

0 comments on commit bd02018

Please sign in to comment.