From cfbd18b258cc710d98d1988672f1058451a6e451 Mon Sep 17 00:00:00 2001 From: Joe Date: Sat, 11 May 2024 16:31:29 -0600 Subject: [PATCH] add additional filter and order-by options to queries; implement Django-style Manager `.objects` on models --- CHANGES.md | 6 ++ VERSION | 2 +- civipy/base/base.py | 67 ++++++++++------ civipy/base/utils.py | 41 +++++----- civipy/contact.py | 55 +++++++++++++- civipy/financial.py | 4 +- civipy/interface/base.py | 36 ++++++++- civipy/interface/query.py | 155 ++++++++++++++++++++++++++++++++++++++ civipy/interface/v3.py | 56 ++++++++++++-- civipy/interface/v4.py | 55 ++++++++++++-- civipy/membership.py | 19 +++++ civipy/note.py | 6 +- civipy/user.py | 14 ++-- 13 files changed, 442 insertions(+), 74 deletions(-) create mode 100644 civipy/interface/query.py diff --git a/CHANGES.md b/CHANGES.md index 0d71b0c..da02671 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,9 +1,15 @@ +### 0.1.0 [pending release] + +- Drop deprecated model methods + ### 0.0.3 [pending release] - Add CiviCRM v4 API usage and make it the default - Drop allowing `CiviCRMBase` to be used directly (must subclass to access a CiviCRM entity) - Update settings to automatically search for a `.civipy` or `pyproject.toml` file - "Drop" Python 3.8 "support" +- Implement Django-style `.objects` Manager on object models +- Deprecate model-level `find`, `get`, `create`, etc. methods ### 0.0.2 January 13, 2024 diff --git a/VERSION b/VERSION index 7bcd0e3..6812f81 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.0.2 \ No newline at end of file +0.0.3 \ No newline at end of file diff --git a/civipy/base/base.py b/civipy/base/base.py index 8fc56bc..e545764 100644 --- a/civipy/base/base.py +++ b/civipy/base/base.py @@ -1,18 +1,30 @@ import json from typing import TypeVar +from warnings import warn from civipy.base.config import logger from civipy.base.utils import get_unique from civipy.exceptions import CiviProgrammingError -from civipy.interface import get_interface, CiviValue, CiviResponse, Interface +from civipy.interface import CiviValue, CiviResponse +from civipy.interface.query import Query CiviEntity = TypeVar("CiviEntity", bound="CiviCRMBase") -class CiviCRMBase: +class MetaCiviCRM(type): + def __new__(typ, name, bases, dct): + cls = super().__new__(typ, name, bases, dct) + cls.objects = Query(cls) + return cls + + +class CiviCRMBase(metaclass=MetaCiviCRM): + objects: Query + @classmethod def get(cls, **kwargs) -> CiviResponse: """Make an API request with the "get" action and return the full response.""" - query = cls._interface().limit(25) + warn("model.get will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2) + query = cls.objects._interface().limit(25) query.update(kwargs) return cls.action("get", **query) @@ -20,7 +32,8 @@ def get(cls, **kwargs) -> CiviResponse: def create(cls, **kwargs: CiviValue) -> CiviEntity: """Make an API request with the "create" action and return an object of class cls populated with the created object's data.""" - query = cls._interface().values(kwargs) + warn("model.create will be removed in v0.1.0, use model.save", DeprecationWarning, stacklevel=2) + query = cls.objects._interface().values(kwargs) response = cls.action("create", **query) logger.debug("new record created! full response: %s" % str(response)) return cls(get_unique(response)) @@ -28,10 +41,25 @@ def create(cls, **kwargs: CiviValue) -> CiviEntity: def update(self: CiviEntity, **kwargs: CiviValue) -> CiviResponse: """Update the current object with the values specified in kwargs. Returns the full API response.""" - self.civi.update(kwargs) - query = self._interface().where({"id": self.civi_id}) - query.update(self._interface().values(kwargs)) - return self.action("create", **query) + warn("model.update will be removed in v0.1.0, use model.save", DeprecationWarning, stacklevel=2) + kwargs["id"] = self.civi_id + query = self.objects._interface().values(kwargs) + return self.action("update", **query) + + def save(self: CiviEntity) -> CiviEntity: + """Save the current instance.""" + action = "create" if self.civi_id is None else "update" + query = self.objects._interface().values(self.civi) + response = self.action(action, **query) + return self.__class__(get_unique(response)) + + def delete(self: CiviEntity, check_permissions: bool = True, use_trash: bool = True) -> CiviResponse: + query = self.objects._interface().where({"id": self.civi_id}) + if check_permissions is False: + query["checkPermissions"] = False + if use_trash is False: + query["useTrash"] = False + return self.action("delete", **query) @classmethod def find(cls, select: list[str] | None = None, **kwargs: CiviValue) -> CiviEntity | None: @@ -41,7 +69,8 @@ def find(cls, select: list[str] | None = None, **kwargs: CiviValue) -> CiviEntit Returns an object of class cls populated with this object's data if found, otherwise returns None.""" - query = cls._interface().where(kwargs) + warn("model.find will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2) + query = cls.objects._interface().where(kwargs) if select: query["select"] = select response = cls.get(**query) @@ -57,7 +86,8 @@ def find_all(cls, select: list[str] | None = None, **kwargs: CiviValue) -> list[ Returns a list of objects of class cls populated with data. Returns an empty list if no matching values found.""" - query = cls._interface().where(kwargs) + warn("model.find_all will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2) + query = cls.objects._interface().where(kwargs) if select: query["select"] = select response = cls.action("get", **query) @@ -72,7 +102,8 @@ def find_and_update(cls, where: CiviValue, **kwargs: CiviValue) -> CiviEntity | Returns an object of class cls populated with this object's data if found, otherwise returns None.""" - query = cls._interface().where(where) + warn("model.find_and_update will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2) + query = cls.objects._interface().where(where) response = cls.get(**query) if response["count"] == 0: return None @@ -97,6 +128,7 @@ def find_or_create(cls, where: CiviValue, do_update: bool = False, **kwargs: Civ Returns an object of class cls populated with the found, updated, or created object's data.""" + warn("model.find_or_create will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2) if do_update: obj = cls.find_and_update(where, **kwargs) else: @@ -111,20 +143,11 @@ def find_or_create(cls, where: CiviValue, do_update: bool = False, **kwargs: Civ @classmethod def action(cls, action: str, **kwargs) -> CiviResponse: """Calls the CiviCRM API action and returns parsed JSON on success.""" + warn("model.action will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2) entity = cls.__name__[4:] if entity == "CRMBase": raise CiviProgrammingError("Subclass CiviCRMBase to create an unsupported Entity.") - return cls._interface()(action, entity, kwargs) - - _interface_reference: Interface | None = None - - @classmethod - def _interface(cls) -> Interface: - """Instantiate the appropriate API interface and store it on the CiviCRMBase class, - so that it will be instantiated only once and available to all entity classes.""" - if CiviCRMBase._interface_reference is None: - CiviCRMBase._interface_reference = get_interface() - return CiviCRMBase._interface_reference + return cls.objects._interface().execute(action, entity, kwargs) def pprint(self: CiviEntity) -> None: """Print the current record's data in a human-friendly format.""" diff --git a/civipy/base/utils.py b/civipy/base/utils.py index a6e669a..7d777b3 100644 --- a/civipy/base/utils.py +++ b/civipy/base/utils.py @@ -7,31 +7,26 @@ def get_unique(response: "CiviResponse") -> "CiviValue": """Enforce that exactly one record was returned in `response` and return just the record.""" - if isinstance(response["values"], dict): - ident = list(response["values"].keys())[0] - return get_unique_value_for_id(ident, response) - else: - return get_unique_value(response) - - -def get_unique_value(response: "CiviResponse") -> "CiviValue": - """Return one value from a non-indexed response.""" - assert_unique(response) - return response["values"][0] - - -def get_unique_value_for_id(record_id: str, response: "CiviResponse") -> "CiviValue": - """Return one value from an indexed response.""" - assert_unique(response) - return response["values"][record_id] - - -def assert_unique(response: "CiviResponse") -> None: - """Raise exception if `response` does not have exactly one record.""" - if not response or "count" not in response: + if not response or "count" not in response or "values" not in response: raise CiviProgrammingError(f"Unexpected response {response}") + record_id = next(iter(response["values"].keys())) if isinstance(response["values"], dict) else 0 + + # Raise exception if `response` does not have exactly one record. count = response["count"] if count == 0: raise NoResultError("No results in response.") - if count > 1: + if count > 1 and not is_identical(response["values"]): raise NonUniqueResultError(f"Response is not unique, has {count} results.") + + return response["values"][record_id] + + +def is_identical(values: list["CiviValue"] | dict[str, "CiviValue"]) -> bool: + """Check if multiple returned values are the same item and can be treated as unique.""" + # I have experienced the V4 REST API returning two instances of a record. The request was + # `CiviContact.find(select=["*", "email_primary.email"], id=11722)`. This was likely due + # to the join to the email table, so I'm treating this as an implicit "select distinct" + if isinstance(values, dict): + values = list(values.values()) + first = values[0] + return all((value == first for value in values[1:])) diff --git a/civipy/contact.py b/civipy/contact.py index 0073c0b..2b135c7 100644 --- a/civipy/contact.py +++ b/civipy/contact.py @@ -1,6 +1,9 @@ import json -from civipy.base.base import CiviCRMBase +from typing import Iterable, Literal +from civipy.base.base import CiviCRMBase, CiviEntity +from civipy.address import CiviCountry from civipy.exceptions import CiviProgrammingError +from civipy.interface import CiviValue from civipy.note import CiviNotable @@ -10,7 +13,22 @@ class CiviContact(CiviNotable): @classmethod def find_by_email(cls, email_address: str, select: list[str] | None = None): email_obj = CiviEmail.find(select=["contact_id"], email=email_address) - return cls.find(select=select, id=email_obj["contact_id"]) + return cls.find(select=select, id=email_obj.civi["contact_id"]) + + @classmethod + def find_all_by_email(cls, email_address: str, select: list[str] | None = None): + return [ + cls.find(select=select, id=email_obj.civi["contact_id"]) + for email_obj in CiviEmail.find_all(select=["contact_id"], email=email_address) + ] + + @classmethod + def create(cls, **kwargs: CiviValue) -> "CiviContact": + country_code: str | None = kwargs.pop("country_code", None) + if country_code is not None: + country = CiviCountry.find_by_country_code(country_code=country_code, select=["id"]) + kwargs["country_id"] = country.id + return super().create(**kwargs) class CiviEmail(CiviCRMBase): @@ -26,6 +44,28 @@ class CiviWebsite(CiviCRMBase): class CiviRelationship(CiviCRMBase): + @classmethod + def find_all(cls, select: list[str] | None = None, **kwargs: CiviValue) -> list[CiviEntity]: + if select is None: + return cls.objects.filter(**kwargs).all() + return cls.objects.filter(**kwargs).values(*select).all() + + @classmethod + def query_filter_hook(cls, version: Literal["3", "4"], query: CiviValue) -> CiviValue: + if version == "3" or "where" not in query: + return query + contact_id = None + parts = [] + for k, c, v in query["where"]: + if k == "contact_id": + contact_id = v + parts.append(["OR", [["contact_id_a", "=", contact_id], ["contact_id_b", "=", contact_id]]]) + continue + parts.append([k, c, v]) + if contact_id is not None: + query["where"] = parts + return query + @classmethod def create_or_increment_relationship( cls, @@ -99,8 +139,17 @@ class CiviGroupContact(CiviCRMBase): class CiviGroup(CiviCRMBase): @classmethod def find_by_title(cls, title: str, select: list[str] | None = None) -> "CiviGroup": - """Creates a new CiviGroup object populated with data for the group entitled "title".""" + """Finds a CiviGroup object for the group entitled "title".""" return cls.find(select=select, title=title) + def update_all(self, contacts: Iterable[int | CiviContact]) -> None: + query = CiviGroupContact.objects.filter(group_id=self.id).values("contact_id") + current_pop = {c.contact_id for c in query} + contact_ids = {c.id if isinstance(c, CiviContact) else c for c in contacts} + for contact_id in current_pop.difference(contact_ids): + CiviGroupContact.objects.filter(group_id=self.id, contact_id=contact_id).delete() + for contact_id in contact_ids.difference(current_pop): + CiviGroupContact.objects.create(group_id=self.id, contact_id=contact_id) + def add_member(self, civi_contact: CiviContact) -> CiviGroupContact: return CiviGroupContact.find_or_create(where={"contact_id": civi_contact.id, "group_id": self.id}) diff --git a/civipy/financial.py b/civipy/financial.py index 1b64109..baa6ff0 100644 --- a/civipy/financial.py +++ b/civipy/financial.py @@ -7,11 +7,11 @@ def cancel(cls, **kwargs): return cls.action("cancel", **kwargs) @classmethod - def find_by_transaction_id(cls, trxn_id: str) -> "CiviFinancialTrxn | None": + def find_by_transaction_id(cls, trxn_id: str, entity_table: str) -> "CiviEntityFinancialTrxn | None": """Find a Contribution Payment by payment transaction ID""" kwargs = { "select": ["*", "financial_trxn_id.*"], - "entity_table": "civicrm_contribution", + "entity_table": entity_table, "financial_trxn_id.trxn_id": trxn_id, } found = cls.find_all(**kwargs) diff --git a/civipy/interface/base.py b/civipy/interface/base.py index 4b11105..1e826da 100644 --- a/civipy/interface/base.py +++ b/civipy/interface/base.py @@ -1,4 +1,5 @@ from typing import TypedDict, Literal, Callable +from warnings import warn CiviValue = dict[str, int | str] CiviV3Request = CiviValue @@ -60,14 +61,47 @@ class BaseInterface: The `values` method is a helper function to generate parameters for create/update. """ + api_version: str = "" + def __init__(self): self.func: Callable[[str, str, CiviValue], CiviResponse] | None = None def __call__(self, action: str, entity: str, params: CiviValue) -> CiviResponse: + warn( + "interface.__call__ will be removed in v0.1.0, use interface.execute instead", + DeprecationWarning, + stacklevel=2 + ) + return self.execute(action, entity, params) + + def execute(self, action: str, entity: str, params: CiviValue) -> CiviResponse: + raise NotImplementedError + + operators = { + "lte": "<=", + "gte": ">=", + "lt": "<", + "gt": ">", + "ne": "!=", + "like": "LIKE", + "not_like": "NOT LIKE", + "in": "IN", + "not_in": "NOT IN", + "between": "BETWEEN", + "not_between": "NOT BETWEEN", + "isnull": "IS NULL", + } + + @staticmethod + def select(fields: list[str]) -> CiviValue | CiviV4Request: + raise NotImplementedError + + @staticmethod + def sort(kwargs: CiviValue) -> CiviValue | CiviV4Request: raise NotImplementedError @staticmethod - def limit(value: int) -> CiviValue | CiviV4Request: + def limit(value: int, offset: int | None = None) -> CiviValue | CiviV4Request: raise NotImplementedError @staticmethod diff --git a/civipy/interface/query.py b/civipy/interface/query.py new file mode 100644 index 0000000..155332a --- /dev/null +++ b/civipy/interface/query.py @@ -0,0 +1,155 @@ +from typing import TYPE_CHECKING, Type, Optional +from civipy.interface import get_interface, Interface + +if TYPE_CHECKING: + from civipy.base.base import CiviCRMBase + from civipy.interface import CiviValue + + +class BaseQuery: + def __init__(self, model: Type["CiviCRMBase"]) -> None: + self.model = model + if model.__name__ in ("MetaCiviCRM", "CiviCRMBase"): + self._entity: str | None = None + else: + self._entity = model.__name__[4:] if model.__name__[:4] == "Civi" else model.__name__ + self._select = None + self._filter = None + self._limit = None + self._order = None + self._result_cache: list | None = None + + def __repr__(self) -> str: + data = list(self[:21]) + if len(data) > 20: + data[-1] = "...(remaining elements truncated)..." + return f"<{self.__class__.__name__} {data}>" + + def __len__(self): + self._fetch_all() + return len(self._result_cache) + + def __iter__(self): + self._fetch_all() + return iter(self._result_cache) + + def __bool__(self): + self._fetch_all() + return bool(self._result_cache) + + def __getitem__(self, k: int | slice): + """Retrieve an item or slice from the set of results.""" + if isinstance(k, int): + if k < 0: + raise ValueError("Negative indexing is not supported.") + if self._result_cache is not None: + return self._result_cache[k] + query = self._chain() + query._limit = {"offset": k, "limit": 1} + query._fetch_all() + return query._result_cache[0] + + if not isinstance(k, slice): + raise TypeError(f"QuerySet indices must be integers or slices, not {type(k).__name__}.") + if (k.start is not None and k.start < 0) or (k.stop is not None and k.stop < 0): + raise ValueError("Negative indexing is not supported.") + if self._result_cache is not None: + return self._result_cache[k] + query = self._chain() + start, stop, step = [None if v is None else int(v) for v in (k.start, k.stop, k.step)] + query._limit = {} if start is None else {"offset": start} + query._limit["limit"] = stop - query._limit.get("offset", 0) + return list(query)[:: step] if step else query + + _interface_reference: Interface | None = None + + @classmethod + def _interface(cls) -> Interface: + """Instantiate the appropriate API interface and store it on the base Query class, + so that it will be instantiated only once and available to all entity classes.""" + if BaseQuery._interface_reference is None: + BaseQuery._interface_reference = get_interface() + return BaseQuery._interface_reference + + def _chain(self): + query = self.__class__(model=self.model) + query._select = self._select + query._limit = self._limit + query._filter = self._filter + query._order = self._order + return query + + def _compose(self, values: Optional["CiviValue"] = None) -> "CiviValue": + """Create a query for the selected API version from the current attributes.""" + interface = self._interface() + params = {} + if self._select is not None: + self._deep_update(params, interface.select(self._select)) + if self._filter is not None: + where = interface.where(self._filter) + if hasattr(self.model, "query_filter_hook"): + where = self.model.query_filter_hook(interface.api_version, where) + self._deep_update(params, where) + if self._limit is not None: + self._deep_update(params, interface.limit(self._limit["limit"])) + if self._order is not None: + self._deep_update(params, interface.sort(self._order)) + if values is not None: + values = interface.values(values) + if hasattr(self.model, "query_values_hook"): + values = self.model.query_values_hook(interface.api_version, values) + self._deep_update(params, values) + return params + + def _deep_update(self, query: "CiviValue", other: "CiviValue") -> None: + """Update `query` with the values in `other`, also updating contained dicts rather than overwriting them.""" + for key, val in other.items(): + if key in query and isinstance(query[key], dict) and isinstance(val, dict): + self._deep_update(query[key], val) + continue + query[key] = val + + def _fetch_all(self): + if self._result_cache is None: + query = self._compose() + results = self._interface().execute("get", self._entity, query) + self._result_cache = list(map(self.model, results["values"])) + + +class Query(BaseQuery): + def all(self): + self._fetch_all() + return self._result_cache + + def filter(self, **kwargs): + query = self._chain() + if query._filter: + query._filter.update(kwargs) + else: + query._filter = kwargs + return query + + def values(self, *args): + query = self._chain() + query._select = args + return query + + def order_by(self, **kwargs): + query = self._chain() + query._order = kwargs + return query + + def create(self, **kwargs): + query = self._interface().values(kwargs) + response = self._interface().execute("create", self._entity, query) + return response + + def delete(self): + query = self._compose() + response = self._interface().execute("delete", self._entity, query) + return response + + def update(self, **kwargs): + query = self._compose(kwargs) + response = self._interface().execute("update", self._entity, query) + return response diff --git a/civipy/interface/v3.py b/civipy/interface/v3.py index d475f70..5f3dc67 100644 --- a/civipy/interface/v3.py +++ b/civipy/interface/v3.py @@ -13,7 +13,9 @@ class V3Interface(BaseInterface): This is the v3 API interface.""" ) - def __call__(self, action: str, entity: str, params: CiviValue) -> CiviV3Response: + api_version = "3" + + def execute(self, action: str, entity: str, params: CiviValue) -> CiviV3Response: if self.func is None: if SETTINGS.api_version != "3": raise CiviProgrammingError(f"API version '{SETTINGS.api_version}' cannot use V3Interface") @@ -25,11 +27,15 @@ def __call__(self, action: str, entity: str, params: CiviValue) -> CiviV3Respons self.func = self.run_cv_cli_process else: raise CiviProgrammingError(f"API type '{SETTINGS.api_type}' not implemented") + # in v3, the 'update' action is deprecated and we are instructed to use 'create' with an id. + if action == "update" and "id" in params: + action = "create" return self.func(action, entity, params) def http_request(self, action: str, entity: str, kwargs: CiviValue) -> CiviV3Response: # v3 see https://docs.civicrm.org/dev/en/latest/api/v3/rest/ - params = self._params(entity, action, kwargs) + kwargs = self._pre_process(kwargs) + params = self._http_params(entity, action, kwargs) # header for v3 API per https://docs.civicrm.org/dev/en/latest/api/v3/rest/#x-requested-with headers = {"X-Requested-With": "XMLHttpRequest"} @@ -42,7 +48,7 @@ def http_request(self, action: str, entity: str, kwargs: CiviValue) -> CiviV3Res return self.process_http_response(response) @staticmethod - def _params(entity: str, action: str, kwargs: CiviValue) -> CiviValue: + def _http_params(entity: str, action: str, kwargs: CiviValue) -> CiviValue: params = { "entity": entity, "action": action, @@ -67,6 +73,7 @@ def process_http_response(self, response: urllib3.BaseHTTPResponse) -> CiviV3Res raise CiviHTTPError(response) def run_cv_cli_process(self, action: str, entity: str, params: CiviValue) -> CiviV3Response: + params = self._pre_process(params) # cli.php -e entity -a action [-u user] [-s site] [--output|--json] [PARAMS] params = ["--%s=%s" % (k, v) for k, v in params.items()] process = subprocess.run( @@ -75,6 +82,7 @@ def run_cv_cli_process(self, action: str, entity: str, params: CiviValue) -> Civ return self.process_json_response(json.loads(process.stdout.decode("UTF-8"))) def run_drush_or_wp_process(self, action: str, entity: str, params: CiviValue) -> CiviV3Response: + params = self._pre_process(params) process = subprocess.run( [SETTINGS.rest_base, "civicrm-api", "--out=json", "--in=json", "%s.%s" % (entity, action)], capture_output=True, @@ -82,6 +90,11 @@ def run_drush_or_wp_process(self, action: str, entity: str, params: CiviValue) - ) return self.process_json_response(json.loads(process.stdout.decode("UTF-8"))) + @staticmethod + def _pre_process(params: CiviValue) -> CiviValue: + if "options" in params: + params["options"] = json.dumps(params["options"], separators=(",", ":")) + @staticmethod def process_json_response(data: CiviV3Response) -> CiviV3Response: if "is_error" in data and data["is_error"] == 1: @@ -89,11 +102,42 @@ def process_json_response(data: CiviV3Response) -> CiviV3Response: return data @staticmethod - def limit(value: int) -> CiviValue: - return {"options": '{"limit":' + str(value) + "}"} + def select(fields: list[str]) -> CiviValue: + return {"return": fields} + + @staticmethod + def sort(kwargs: CiviValue) -> CiviValue: + option = [] + for k, v in kwargs.items(): + if isinstance(v, str) and v.upper() in ("ASC", "DESC"): + option.append(k if v.upper() == "ASC" else f"{k} DESC") + elif isinstance(v, int) and v in (0, 1): + option.append(k if v else f"{k} DESC") + else: + raise CiviProgrammingError(f"Invalid sort value for {k}: {repr(v)}") + return {"options": {"sort": option}} @staticmethod - def where(kwargs: CiviValue) -> CiviValue: + def limit(value: int, offset: int | None = None) -> CiviValue: + option = {"limit": value} + if offset is not None: + option["offset"] = offset + return {"options": option} + + @classmethod + def where(cls, kwargs: CiviValue) -> CiviValue: + option = {} + for key, val in kwargs.items(): + parts = key.split("__") + if len(parts) > 1 and parts[-1] in cls.operators: + *parts, op = parts + if op == "isnull": + val = {"IS NULL": 1} if val else {"IS NOT NULL": 1} + elif (op.endswith("between") or op.endswith("in")) and not isinstance(val, list): + raise CiviProgrammingError(f"Must provide a list for `in` or `between` operators.") + else: + val = {cls.operators[op]: val} + option[".".join(parts)] = val return kwargs @staticmethod diff --git a/civipy/interface/v4.py b/civipy/interface/v4.py index a34ea2c..3c32d92 100644 --- a/civipy/interface/v4.py +++ b/civipy/interface/v4.py @@ -14,7 +14,9 @@ class V4Interface(BaseInterface): This is the v4 API interface.""" ) - def __call__(self, action: str, entity: str, params: CiviV4Request) -> CiviV4Response: + api_version = "4" + + def execute(self, action: str, entity: str, params: CiviValue) -> CiviV4Response: if self.func is None: if SETTINGS.api_version != "4": raise CiviProgrammingError(f"API version '{SETTINGS.api_version}' cannot use V4Interface") @@ -81,12 +83,55 @@ def process_json_response(data: CiviV4Response) -> CiviV4Response: return data @staticmethod - def limit(value: int) -> CiviV4Request: - return {"limit": value} + def select(fields: list[str]) -> CiviV4Request: + return {"select": fields} + + @staticmethod + def sort(kwargs: CiviValue) -> CiviValue | CiviV4Request: + option = {} + for k, v in kwargs.items(): + if isinstance(v, str) and v.upper() in ("ASC", "DESC"): + option[k] = v.upper() + elif isinstance(v, int) and v in (0, 1): + option[k] = ("DESC", "ASC")[v] + else: + raise CiviProgrammingError(f"Invalid sort value for {k}: {repr(v)}") + return {"orderBy": option} @staticmethod - def where(kwargs: CiviValue) -> CiviV4Request: - return {"where": [[k, "=", v] for k, v in kwargs.items()]} + def limit(value: int, offset: int | None = None) -> CiviV4Request: + option = {"limit": value} + if offset is not None: + option["offset"] = offset + return option + + operators = BaseInterface.operators | { + "contains": "CONTAINS", + "not_contains": "NOT CONTAINS", + "isempty": "IS EMPTY", + "regexp": "REGEXP", + "not_regexp": "NOT REGEXP", + } + + @classmethod + def where(cls, kwargs: CiviValue) -> CiviV4Request: + option = [] + for key, val in kwargs.items(): + parts = key.split("__") + if len(parts) > 1 and parts[-1] in cls.operators: + *parts, op = parts + if op == "isnull": + value = ["IS NULL"] if val else ["IS NOT NULL"] + elif op == "isempty": + value = ["IS EMPTY"] if val else ["IS NOT EMPTY"] + elif (op.endswith("between") or op.endswith("in")) and not isinstance(val, list): + raise CiviProgrammingError(f"Must provide a list for `in` or `between` operators.") + else: + value = [cls.operators[op], val] + else: + value = ["=", val] + option.append([".".join(parts)] + value) + return {"where": option} @staticmethod def values(kwargs: CiviValue) -> CiviV4Request: diff --git a/civipy/membership.py b/civipy/membership.py index 6b59ee0..54a37fc 100644 --- a/civipy/membership.py +++ b/civipy/membership.py @@ -1,6 +1,9 @@ from datetime import datetime +from typing import Literal from civipy.base.base import CiviCRMBase from civipy.contribution import CiviContribution +from civipy.exceptions import CiviProgrammingError +from civipy.interface import CiviValue class CiviMembershipPayment(CiviCRMBase): @@ -22,3 +25,19 @@ def apply_contribution(self, contribution: CiviContribution): cid = contribution.id CiviMembershipPayment.create(contribution_id=cid, membership_id=self.id) self.update(end_date=end.strftime("%Y-%m-%d"), status_id=status) + + def set_status(self, status_id: int | None = None, status: str | None = None, is_override: bool = False): + if status_id is not None and status is not None: + raise CiviProgrammingError("Undefined behavior: called set_status with `status_id` and `status`.") + if status_id is None and status is None: + raise CiviProgrammingError("Called set_status with no status.") + values = {"status_id": status_id} + if is_override is True: + values["is_override"] = True + return self.objects.filter(id=self.id).update(**values) + + @classmethod + def query_values_hook(cls, version: Literal["3", "4"], query: CiviValue) -> CiviValue: + if version == "4" and "values" in query and isinstance(query["values"].get("status_id"), str): + query["values"]["status_id.name"] = query["values"].pop("status_id") + return query diff --git a/civipy/note.py b/civipy/note.py index 9d0e2ca..441fcdc 100644 --- a/civipy/note.py +++ b/civipy/note.py @@ -7,11 +7,7 @@ class CiviNote(CiviCRMBase): class CiviNotable(CiviCRMBase): def _where_for_note(self, subject: str) -> dict[str, str | int]: - return { - "entity_id": self.civi["id"], - "entity_table": self.__class__.civicrm_entity_table, - "subject": subject, - } + return {"entity_id": self.civi["id"], "entity_table": self.civicrm_entity_table, "subject": subject} def add_note(self, subject: str, note: str): return CiviNote.create(note=note, **self._where_for_note(subject)) diff --git a/civipy/user.py b/civipy/user.py index d3ed4bd..c692e7b 100644 --- a/civipy/user.py +++ b/civipy/user.py @@ -15,7 +15,7 @@ class CiviUFJoin(CiviCRMBase): class CiviUFMatch(CiviCRMBase): - """This is the table that matches WordPress users to CiviCRM Contacts. + """This is the table that matches host system users to CiviCRM Contacts. create requires uf_id, uf_name, and contact_id @@ -28,7 +28,7 @@ class CiviUFMatch(CiviCRMBase): """ @classmethod - def find_wp(cls, contact_ids): + def find_system_users(cls, contact_ids: list[int]) -> list["CiviUFMatch"]: result = [] for contact_id in set(contact_ids): found = cls.find(contact_id=contact_id) @@ -45,10 +45,12 @@ def find_wp(cls, contact_ids): result.civi[attr] = int(result.civi[attr]) return result - def update_wp_user(self, wp_user_id): - payload = self.civi - payload["uf_id"] = wp_user_id - self.update(**payload) + def update_system_user(self, user_id: int): + return self.update(**self.civi, uf_id=user_id) + + @classmethod + def connect(cls, host_user: int, contact_id: int, domain_id: int = 1): + return cls.objects.create(domain_id=domain_id, uf_id=host_user, contact_id=contact_id) class CiviUser(CiviCRMBase):