Skip to content

Commit

Permalink
add additional filter and order-by options to queries; implement Djan…
Browse files Browse the repository at this point in the history
…go-style Manager `.objects` on models
  • Loading branch information
indepndnt committed May 11, 2024
1 parent 948a2fc commit cfbd18b
Show file tree
Hide file tree
Showing 13 changed files with 442 additions and 74 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
### 0.1.0 [pending release]

- Drop deprecated model methods

### 0.0.3 [pending release]

- Add CiviCRM v4 API usage and make it the default
- Drop allowing `CiviCRMBase` to be used directly (must subclass to access a CiviCRM entity)
- Update settings to automatically search for a `.civipy` or `pyproject.toml` file
- "Drop" Python 3.8 "support"
- Implement Django-style `.objects` Manager on object models
- Deprecate model-level `find`, `get`, `create`, etc. methods

### 0.0.2 January 13, 2024

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.2
0.0.3
67 changes: 45 additions & 22 deletions civipy/base/base.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,65 @@
import json
from typing import TypeVar
from warnings import warn
from civipy.base.config import logger
from civipy.base.utils import get_unique
from civipy.exceptions import CiviProgrammingError
from civipy.interface import get_interface, CiviValue, CiviResponse, Interface
from civipy.interface import CiviValue, CiviResponse
from civipy.interface.query import Query

CiviEntity = TypeVar("CiviEntity", bound="CiviCRMBase")


class CiviCRMBase:
class MetaCiviCRM(type):
def __new__(typ, name, bases, dct):
cls = super().__new__(typ, name, bases, dct)
cls.objects = Query(cls)
return cls


class CiviCRMBase(metaclass=MetaCiviCRM):
objects: Query

@classmethod
def get(cls, **kwargs) -> CiviResponse:
"""Make an API request with the "get" action and return the full response."""
query = cls._interface().limit(25)
warn("model.get will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2)
query = cls.objects._interface().limit(25)
query.update(kwargs)
return cls.action("get", **query)

@classmethod
def create(cls, **kwargs: CiviValue) -> CiviEntity:
"""Make an API request with the "create" action and return an object of class cls
populated with the created object's data."""
query = cls._interface().values(kwargs)
warn("model.create will be removed in v0.1.0, use model.save", DeprecationWarning, stacklevel=2)
query = cls.objects._interface().values(kwargs)
response = cls.action("create", **query)
logger.debug("new record created! full response: %s" % str(response))
return cls(get_unique(response))

def update(self: CiviEntity, **kwargs: CiviValue) -> CiviResponse:
"""Update the current object with the values specified in kwargs. Returns the full
API response."""
self.civi.update(kwargs)
query = self._interface().where({"id": self.civi_id})
query.update(self._interface().values(kwargs))
return self.action("create", **query)
warn("model.update will be removed in v0.1.0, use model.save", DeprecationWarning, stacklevel=2)
kwargs["id"] = self.civi_id
query = self.objects._interface().values(kwargs)
return self.action("update", **query)

def save(self: CiviEntity) -> CiviEntity:
"""Save the current instance."""
action = "create" if self.civi_id is None else "update"
query = self.objects._interface().values(self.civi)
response = self.action(action, **query)
return self.__class__(get_unique(response))

def delete(self: CiviEntity, check_permissions: bool = True, use_trash: bool = True) -> CiviResponse:
query = self.objects._interface().where({"id": self.civi_id})
if check_permissions is False:
query["checkPermissions"] = False
if use_trash is False:
query["useTrash"] = False
return self.action("delete", **query)

@classmethod
def find(cls, select: list[str] | None = None, **kwargs: CiviValue) -> CiviEntity | None:
Expand All @@ -41,7 +69,8 @@ def find(cls, select: list[str] | None = None, **kwargs: CiviValue) -> CiviEntit
Returns an object of class cls populated with this object's data if found, otherwise
returns None."""
query = cls._interface().where(kwargs)
warn("model.find will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2)
query = cls.objects._interface().where(kwargs)
if select:
query["select"] = select
response = cls.get(**query)
Expand All @@ -57,7 +86,8 @@ def find_all(cls, select: list[str] | None = None, **kwargs: CiviValue) -> list[
Returns a list of objects of class cls populated with data. Returns an empty list
if no matching values found."""
query = cls._interface().where(kwargs)
warn("model.find_all will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2)
query = cls.objects._interface().where(kwargs)
if select:
query["select"] = select
response = cls.action("get", **query)
Expand All @@ -72,7 +102,8 @@ def find_and_update(cls, where: CiviValue, **kwargs: CiviValue) -> CiviEntity |
Returns an object of class cls populated with this object's data if found, otherwise
returns None."""
query = cls._interface().where(where)
warn("model.find_and_update will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2)
query = cls.objects._interface().where(where)
response = cls.get(**query)
if response["count"] == 0:
return None
Expand All @@ -97,6 +128,7 @@ def find_or_create(cls, where: CiviValue, do_update: bool = False, **kwargs: Civ
Returns an object of class cls populated with the found, updated, or created
object's data."""
warn("model.find_or_create will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2)
if do_update:
obj = cls.find_and_update(where, **kwargs)
else:
Expand All @@ -111,20 +143,11 @@ def find_or_create(cls, where: CiviValue, do_update: bool = False, **kwargs: Civ
@classmethod
def action(cls, action: str, **kwargs) -> CiviResponse:
"""Calls the CiviCRM API action and returns parsed JSON on success."""
warn("model.action will be removed in v0.1.0, use model.objects methods", DeprecationWarning, stacklevel=2)
entity = cls.__name__[4:]
if entity == "CRMBase":
raise CiviProgrammingError("Subclass CiviCRMBase to create an unsupported Entity.")
return cls._interface()(action, entity, kwargs)

_interface_reference: Interface | None = None

@classmethod
def _interface(cls) -> Interface:
"""Instantiate the appropriate API interface and store it on the CiviCRMBase class,
so that it will be instantiated only once and available to all entity classes."""
if CiviCRMBase._interface_reference is None:
CiviCRMBase._interface_reference = get_interface()
return CiviCRMBase._interface_reference
return cls.objects._interface().execute(action, entity, kwargs)

def pprint(self: CiviEntity) -> None:
"""Print the current record's data in a human-friendly format."""
Expand Down
41 changes: 18 additions & 23 deletions civipy/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,26 @@

def get_unique(response: "CiviResponse") -> "CiviValue":
"""Enforce that exactly one record was returned in `response` and return just the record."""
if isinstance(response["values"], dict):
ident = list(response["values"].keys())[0]
return get_unique_value_for_id(ident, response)
else:
return get_unique_value(response)


def get_unique_value(response: "CiviResponse") -> "CiviValue":
"""Return one value from a non-indexed response."""
assert_unique(response)
return response["values"][0]


def get_unique_value_for_id(record_id: str, response: "CiviResponse") -> "CiviValue":
"""Return one value from an indexed response."""
assert_unique(response)
return response["values"][record_id]


def assert_unique(response: "CiviResponse") -> None:
"""Raise exception if `response` does not have exactly one record."""
if not response or "count" not in response:
if not response or "count" not in response or "values" not in response:
raise CiviProgrammingError(f"Unexpected response {response}")
record_id = next(iter(response["values"].keys())) if isinstance(response["values"], dict) else 0

# Raise exception if `response` does not have exactly one record.
count = response["count"]
if count == 0:
raise NoResultError("No results in response.")
if count > 1:
if count > 1 and not is_identical(response["values"]):
raise NonUniqueResultError(f"Response is not unique, has {count} results.")

return response["values"][record_id]


def is_identical(values: list["CiviValue"] | dict[str, "CiviValue"]) -> bool:
"""Check if multiple returned values are the same item and can be treated as unique."""
# I have experienced the V4 REST API returning two instances of a record. The request was
# `CiviContact.find(select=["*", "email_primary.email"], id=11722)`. This was likely due
# to the join to the email table, so I'm treating this as an implicit "select distinct"
if isinstance(values, dict):
values = list(values.values())
first = values[0]
return all((value == first for value in values[1:]))
55 changes: 52 additions & 3 deletions civipy/contact.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
from civipy.base.base import CiviCRMBase
from typing import Iterable, Literal
from civipy.base.base import CiviCRMBase, CiviEntity
from civipy.address import CiviCountry
from civipy.exceptions import CiviProgrammingError
from civipy.interface import CiviValue
from civipy.note import CiviNotable


Expand All @@ -10,7 +13,22 @@ class CiviContact(CiviNotable):
@classmethod
def find_by_email(cls, email_address: str, select: list[str] | None = None):
email_obj = CiviEmail.find(select=["contact_id"], email=email_address)
return cls.find(select=select, id=email_obj["contact_id"])
return cls.find(select=select, id=email_obj.civi["contact_id"])

@classmethod
def find_all_by_email(cls, email_address: str, select: list[str] | None = None):
return [
cls.find(select=select, id=email_obj.civi["contact_id"])
for email_obj in CiviEmail.find_all(select=["contact_id"], email=email_address)
]

@classmethod
def create(cls, **kwargs: CiviValue) -> "CiviContact":
country_code: str | None = kwargs.pop("country_code", None)
if country_code is not None:
country = CiviCountry.find_by_country_code(country_code=country_code, select=["id"])
kwargs["country_id"] = country.id
return super().create(**kwargs)


class CiviEmail(CiviCRMBase):
Expand All @@ -26,6 +44,28 @@ class CiviWebsite(CiviCRMBase):


class CiviRelationship(CiviCRMBase):
@classmethod
def find_all(cls, select: list[str] | None = None, **kwargs: CiviValue) -> list[CiviEntity]:
if select is None:
return cls.objects.filter(**kwargs).all()
return cls.objects.filter(**kwargs).values(*select).all()

@classmethod
def query_filter_hook(cls, version: Literal["3", "4"], query: CiviValue) -> CiviValue:
if version == "3" or "where" not in query:
return query
contact_id = None
parts = []
for k, c, v in query["where"]:
if k == "contact_id":
contact_id = v
parts.append(["OR", [["contact_id_a", "=", contact_id], ["contact_id_b", "=", contact_id]]])
continue
parts.append([k, c, v])
if contact_id is not None:
query["where"] = parts
return query

@classmethod
def create_or_increment_relationship(
cls,
Expand Down Expand Up @@ -99,8 +139,17 @@ class CiviGroupContact(CiviCRMBase):
class CiviGroup(CiviCRMBase):
@classmethod
def find_by_title(cls, title: str, select: list[str] | None = None) -> "CiviGroup":
"""Creates a new CiviGroup object populated with data for the group entitled "title"."""
"""Finds a CiviGroup object for the group entitled "title"."""
return cls.find(select=select, title=title)

def update_all(self, contacts: Iterable[int | CiviContact]) -> None:
query = CiviGroupContact.objects.filter(group_id=self.id).values("contact_id")
current_pop = {c.contact_id for c in query}
contact_ids = {c.id if isinstance(c, CiviContact) else c for c in contacts}
for contact_id in current_pop.difference(contact_ids):
CiviGroupContact.objects.filter(group_id=self.id, contact_id=contact_id).delete()
for contact_id in contact_ids.difference(current_pop):
CiviGroupContact.objects.create(group_id=self.id, contact_id=contact_id)

def add_member(self, civi_contact: CiviContact) -> CiviGroupContact:
return CiviGroupContact.find_or_create(where={"contact_id": civi_contact.id, "group_id": self.id})
4 changes: 2 additions & 2 deletions civipy/financial.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ def cancel(cls, **kwargs):
return cls.action("cancel", **kwargs)

@classmethod
def find_by_transaction_id(cls, trxn_id: str) -> "CiviFinancialTrxn | None":
def find_by_transaction_id(cls, trxn_id: str, entity_table: str) -> "CiviEntityFinancialTrxn | None":
"""Find a Contribution Payment by payment transaction ID"""
kwargs = {
"select": ["*", "financial_trxn_id.*"],
"entity_table": "civicrm_contribution",
"entity_table": entity_table,
"financial_trxn_id.trxn_id": trxn_id,
}
found = cls.find_all(**kwargs)
Expand Down
36 changes: 35 additions & 1 deletion civipy/interface/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import TypedDict, Literal, Callable
from warnings import warn

CiviValue = dict[str, int | str]
CiviV3Request = CiviValue
Expand Down Expand Up @@ -60,14 +61,47 @@ class BaseInterface:
The `values` method is a helper function to generate parameters for create/update.
"""

api_version: str = ""

def __init__(self):
self.func: Callable[[str, str, CiviValue], CiviResponse] | None = None

def __call__(self, action: str, entity: str, params: CiviValue) -> CiviResponse:
warn(
"interface.__call__ will be removed in v0.1.0, use interface.execute instead",
DeprecationWarning,
stacklevel=2
)
return self.execute(action, entity, params)

def execute(self, action: str, entity: str, params: CiviValue) -> CiviResponse:
raise NotImplementedError

operators = {
"lte": "<=",
"gte": ">=",
"lt": "<",
"gt": ">",
"ne": "!=",
"like": "LIKE",
"not_like": "NOT LIKE",
"in": "IN",
"not_in": "NOT IN",
"between": "BETWEEN",
"not_between": "NOT BETWEEN",
"isnull": "IS NULL",
}

@staticmethod
def select(fields: list[str]) -> CiviValue | CiviV4Request:
raise NotImplementedError

@staticmethod
def sort(kwargs: CiviValue) -> CiviValue | CiviV4Request:
raise NotImplementedError

@staticmethod
def limit(value: int) -> CiviValue | CiviV4Request:
def limit(value: int, offset: int | None = None) -> CiviValue | CiviV4Request:
raise NotImplementedError

@staticmethod
Expand Down
Loading

0 comments on commit cfbd18b

Please sign in to comment.