Skip to content

Commit

Permalink
Add metadata support (#335)
Browse files Browse the repository at this point in the history
* Add metadata support

* The "metadata" field has been renamed to "datadoc_metadata"

* Fix metadata for fields

* Make CI happy

* Make mypy and pre-commit happy

* Bump to new release version

* Only drop polars column if exists

---------

Co-authored-by: Bjørn-Andre Skaar <[email protected]>
  • Loading branch information
mallport and bjornandre authored Feb 13, 2024
1 parent 667e461 commit 83695dc
Show file tree
Hide file tree
Showing 25 changed files with 1,119 additions and 1,009 deletions.
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Sphinx configuration."""

# Configuration file for the Sphinx documentation builder.
#
# This file only contains a selection of the most common options. For a full
Expand Down
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Nox sessions."""

import os
import shlex
import shutil
Expand Down
1,443 changes: 734 additions & 709 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dapla-toolbelt-pseudo"
version = "1.0.4"
version = "1.1.0"
description = "Pseudonymization extensions for Dapla"
authors = ["Dapla Developers <[email protected]>"]
license = "MIT"
Expand All @@ -27,6 +27,8 @@ fsspec = ">=2023.5.0"
polars = ">=0.18.2"
pygments = ">2.15.0"
click = ">=8.0.1"
ssb-datadoc-model = ">=4.1.3"


[tool.poetry.group.test.dependencies]
typeguard = ">=2.13.3"
Expand Down
1 change: 1 addition & 0 deletions src/dapla_pseudo/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This module defines constants that are referenced throughout the codebase."""

from enum import Enum

TIMEOUT_DEFAULT: int = 300 # seconds
Expand Down
1 change: 1 addition & 0 deletions src/dapla_pseudo/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""The models module contains base classes used by other models."""

from humps import camelize
from pydantic import BaseModel
from pydantic import ConfigDict
Expand Down
1 change: 1 addition & 0 deletions src/dapla_pseudo/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Utility functions for Dapla Pseudo."""

import typing as t
from datetime import date
from pathlib import Path
Expand Down
1 change: 1 addition & 0 deletions src/dapla_pseudo/v1/api_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This module defines helper classes and API models used to communicate with the Dapla Pseudo Service."""

import json
import typing as t
from datetime import date
Expand Down
19 changes: 12 additions & 7 deletions src/dapla_pseudo/v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def __auth_token(self) -> str:
audience = os.environ["PSEUDO_SERVICE_URL"]
auth_req = google.auth.transport.requests.Request()
token = t.cast(
str, google.oauth2.id_token.fetch_id_token(auth_req, audience)
str,
google.oauth2.id_token.fetch_id_token(auth_req, audience), # type: ignore[no-untyped-call]
)
return token
else:
Expand Down Expand Up @@ -223,9 +224,11 @@ def _extract_name(
def _process_file(
self,
operation: str,
request: PseudonymizeFileRequest
| DepseudonymizeFileRequest
| RepseudonymizeFileRequest,
request: (
PseudonymizeFileRequest
| DepseudonymizeFileRequest
| RepseudonymizeFileRequest
),
file_path: str,
timeout: int,
stream: bool = False,
Expand All @@ -247,9 +250,11 @@ def _process_file(
def _post_to_file_endpoint(
self,
path: str,
request: PseudonymizeFileRequest
| DepseudonymizeFileRequest
| RepseudonymizeFileRequest,
request: (
PseudonymizeFileRequest
| DepseudonymizeFileRequest
| RepseudonymizeFileRequest
),
data: t.BinaryIO,
name: str,
content_type: Mimetypes,
Expand Down
84 changes: 23 additions & 61 deletions src/dapla_pseudo/v1/depseudo.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""Builder for submitting a pseudonymization request."""
import json

import typing as t
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from typing import Optional

import pandas as pd
import polars as pl
import requests
from requests import Response

from dapla_pseudo.constants import TIMEOUT_DEFAULT
Expand All @@ -24,8 +23,11 @@
from dapla_pseudo.v1.api_models import PseudoRule
from dapla_pseudo.v1.ops import _client
from dapla_pseudo.v1.pseudo_commons import File
from dapla_pseudo.v1.pseudo_commons import PseudoFieldResponse
from dapla_pseudo.v1.pseudo_commons import PseudoFileResponse
from dapla_pseudo.v1.pseudo_commons import RawPseudoMetadata
from dapla_pseudo.v1.pseudo_commons import get_file_data_from_dataset
from dapla_pseudo.v1.result import PseudoFileResponse
from dapla_pseudo.v1.pseudo_commons import pseudonymize_operation_field
from dapla_pseudo.v1.result import Result


Expand Down Expand Up @@ -84,7 +86,6 @@ def __init__(self, rules: Optional[list[PseudoRule]] = None) -> None:
"""Initialize the class."""
self._rules: list[PseudoRule] = [] if rules is None else rules
self._pseudo_keyset: Optional[PseudoKeyset | str] = None
self._metadata: dict[str, str] = {}
self._timeout: int = TIMEOUT_DEFAULT

def on_fields(self, *fields: str) -> "Depseudonymize._DepseudoFuncSelector":
Expand Down Expand Up @@ -156,7 +157,6 @@ def _depseudonymize_file(self) -> Result:
)
return Result(
PseudoFileResponse(response, file.content_type, streamed=True),
self._metadata,
)

def _depseudonymize_field(self) -> Result:
Expand All @@ -171,7 +171,7 @@ def _depseudonymize_field(self) -> Result:

def depseudonymize_field_runner(
field_name: str, series: pl.Series, pseudo_func: PseudoFunction
) -> tuple[str, pl.Series]:
) -> tuple[str, pl.Series, RawPseudoMetadata]:
"""Function that performs the pseudonymization on a pandas Series.
Args:
Expand All @@ -182,23 +182,21 @@ def depseudonymize_field_runner(
Returns:
tuple[str,pl.Series]: A tuple containing the field_name and the corresponding series.
"""
return (
field_name,
_do_depseudonymize_field(
path="depseudonymize/field",
field_name=field_name,
values=series.to_list(),
pseudo_func=pseudo_func,
metadata_map=self._metadata,
timeout=self._timeout,
keyset=KeyWrapper(self._pseudo_keyset).keyset,
),
data, metadata = pseudonymize_operation_field(
path="depseudonymize/field",
field_name=field_name,
values=series.to_list(),
pseudo_func=pseudo_func,
timeout=self._timeout,
keyset=KeyWrapper(self._pseudo_keyset).keyset,
)
return field_name, data, metadata

dataframe = t.cast(pl.DataFrame, Depseudonymize.dataset)
# Execute the pseudonymization API calls in parallel
with ThreadPoolExecutor() as executor:
depseudonymized_field: dict[str, pl.Series] = {}
raw_metadata_fields: list[RawPseudoMetadata] = []
futures = [
executor.submit(
depseudonymize_field_runner,
Expand All @@ -210,14 +208,17 @@ def depseudonymize_field_runner(
]
# Wait for the futures to finish, then add each field to pseudonymized_field map
for future in as_completed(futures):
result = future.result()
# Each future result contains the field_name (0) and the pseudonymize_values (1)
depseudonymized_field[result[0]] = result[1]
field_name, data, raw_metadata = future.result()
depseudonymized_field[field_name] = data
raw_metadata_fields.append(raw_metadata)

depseudonymized_df = pl.DataFrame(depseudonymized_field)
dataframe = dataframe.update(depseudonymized_df)

return Result(pseudo_response=dataframe, metadata=self._metadata)
return Result(
pseudo_response=PseudoFieldResponse(
data=dataframe, raw_metadata=raw_metadata_fields
)
)

class _DepseudoFuncSelector:
def __init__(
Expand Down Expand Up @@ -283,42 +284,3 @@ def _rule_constructor(
for field in self._fields
]
return Depseudonymize._Depseudonymizer(self._existing_rules + rules)


def _do_depseudonymize_field(
path: str,
field_name: str,
values: list[str],
pseudo_func: Optional[PseudoFunction],
metadata_map: dict[str, str],
timeout: int,
keyset: Optional[PseudoKeyset] = None,
) -> pl.Series:
"""Makes pseudonymization API calls for a list of values for a specific field and processes it into a polars Series.
Args:
path (str): The path to the pseudonymization endpoint.
field_name (str): The name of the field being pseudonymized.
values (list[str]): The list of values to be pseudonymized.
pseudo_func (Optional[PseudoFunction]): The pseudonymization function to apply to the values.
metadata_map (Dict[str, str]): A dictionary to store the metadata associated with each field.
timeout (int): The timeout in seconds for the API call.
keyset (Optional[PseudoKeyset], optional): The pseudonymization keyset to use. Defaults to None.
Returns:
pl.Series: A pandas Series containing the pseudonymized values.
"""
response: requests.Response = _client()._post_to_field_endpoint(
path, field_name, values, pseudo_func, timeout, keyset, stream=True
)
metadata_map[field_name] = str(response.headers.get("metadata") or "")

# The response content is received as a buffered byte stream from the server.
# We decode the content using UTF-8, which gives us a List[List[str]] structure.
# To obtain a single list of strings, we combine the values from the nested sublists into a flat list.
nested_list = json.loads(response.content.decode("utf-8"))
combined_list = []
for sublist in nested_list:
combined_list.extend(sublist)

return pl.Series(combined_list)
1 change: 1 addition & 0 deletions src/dapla_pseudo/v1/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
hierarchical data structures, many users will be just fine with using one key and just listing the fields (of their
flat data).
"""

import io
import mimetypes
import os
Expand Down
Loading

0 comments on commit 83695dc

Please sign in to comment.