diff --git a/target_actionkit/client.py b/target_actionkit/client.py index 9791bfc..2391646 100644 --- a/target_actionkit/client.py +++ b/target_actionkit/client.py @@ -20,6 +20,8 @@ def __init__( ) -> None: super().__init__(target, stream_name, schema, key_properties) self.__auth = ActionKitAuth(dict(self.config)) + self.lists = None + self.initialize_lists() @property def base_url(self): @@ -27,8 +29,9 @@ def base_url(self): def validate_response(self, response: requests.Response) -> None: """Validate HTTP response.""" + msg = self.response_error_message(response) if response.status_code in [409]: - msg = response.reason + msg = f"{msg}. reason: {response.reason}" raise FatalAPIError(msg) elif response.status_code in [429] or 500 <= response.status_code < 600: msg = self.response_error_message(response) @@ -47,3 +50,24 @@ def prepare_request_headers(self): "Accept": "application/json", "Authorization": self.__auth(requests.Request()) } + + def initialize_lists(self): + if getattr(self, "lists"): + return + self.lists = [] + list_url = f"list/" + params = "?_limit=100" + next_url = f"{list_url}{params}" + + while next_url: + response = self.request_api("GET", endpoint=next_url, headers=self.prepare_request_headers()) + response_data = response.json() + self.lists.extend(response_data.get("objects", [])) + params: str = response_data.get("meta", {}).get("next", "") + if params: + params = params.split("/")[-1] + next_url = f"{list_url}{params}" + else: + next_url = None + + self.map_list_name_to_id = {l["name"]: l["id"] for l in self.lists} diff --git a/target_actionkit/sinks.py b/target_actionkit/sinks.py index 802bf78..805f7fb 100644 --- a/target_actionkit/sinks.py +++ b/target_actionkit/sinks.py @@ -34,11 +34,84 @@ def add_phone_numbers(self, user_id: str, record: dict): request_data={"phones": existing_phones}, headers=self.prepare_request_headers() ) + + def get_subscribed_lists(self, user_id): + subscribed_list_ids = [] + if user_id: + subscribed_lists = [] + subscriptions_url = f"subscription/" + params = f"?user={user_id}&_limit=100" + next_url = f"{subscriptions_url}{params}" + + while next_url: + response = self.request_api("GET", endpoint=next_url, headers=self.prepare_request_headers()) + response_data = response.json() + subscribed_lists.extend(response_data.get("objects", [])) + params: str = response_data.get("meta", {}).get("next", "") + if params: + params = params.split("/")[-1] + next_url = f"{subscriptions_url}{params}" + else: + next_url = None + + for subscribed_obj in subscribed_lists: + list_id = subscribed_obj["list"].split("/")[-2] + response = self.request_api("GET", endpoint=f"list/{list_id}/", headers=self.prepare_request_headers()) + subscribed_list_ids.append(response.json()["id"]) + + return subscribed_list_ids + + def add_lists(self, user_email: str, lists: list = None): + if lists and isinstance(lists, list): + self.logger.info(f"add lists to user: {user_email}. lists: {lists}") + return self.request_api( + "POST", + request_data={ + "email": user_email, + "page": "signup", + "lists": lists + }, + endpoint="action", + headers=self.prepare_request_headers() + ) + + def remove_lists(self, user_email: str, lists: list = None): + if lists and isinstance(lists, list): + self.logger.info(f"add lists to user: {user_email}. lists: {lists}") + return self.request_api( + "POST", + request_data={ + "email": user_email, + "page": "unsubscribe", + "lists": lists + }, + endpoint="action", + headers=self.prepare_request_headers() + ) + + def create_list(self, list_name: str): + if list_name and isinstance(list_name, str): + self.logger.info(f"creating list: {list_name}") + response = self.request_api( + "POST", + request_data={ + "name": list_name + }, + endpoint="list", + headers=self.prepare_request_headers() + ) + if response.ok: + response = self.request_api("GET", endpoint="list", params={"name": list_name}, headers=self.prepare_request_headers()) + res_json = response.json() + list_id = res_json.get("objects")[0].get("id") + self.map_list_name_to_id[list_name] = list_id def upsert_record(self, record: dict, context: dict): state_dict = dict() if record.get("email"): + if record.get("error"): + raise Exception(record.get("error")) search_response = self.request_api( "GET", endpoint=f"user/?email={record['email']}", @@ -46,9 +119,17 @@ def upsert_record(self, record: dict, context: dict): ) existing_users = search_response.json().get("objects", []) + subscription_status = record.pop("subscription_status") if existing_users: user_id = existing_users[0].get("id") + subscribed_lists = self.get_subscribed_lists(user_id) + lists = record.get("lists", []) + to_subscribe = list(set(lists) - set(subscribed_lists)) + self.add_lists(record["email"], to_subscribe) + if subscription_status == "unsubscribed": + to_unsubscribe = list(set(subscribed_lists) - set(lists)) + self.remove_lists(record["email"], to_unsubscribe) response = self.request_api( "PATCH", request_data=record, @@ -73,6 +154,7 @@ def upsert_record(self, record: dict, context: dict): state_dict["success"] = True id = response.headers['Location'].replace(f"https://{self.config.get('hostname')}.actionkit.com/rest/v1/user/", "")[:-1] self.logger.info(id) + self.add_lists(record["email"], record.get("lists")) self.add_phone_numbers(id, record) return id, response.ok, state_dict @@ -82,20 +164,41 @@ def preprocess_record(self, record: dict, context: dict) -> dict: payload = { "first_name": record.get("first_name"), "last_name": record.get("last_name"), - "email": record.get("email") + "email": record.get("email"), + "subscription_status": record.get("subscription_status"), } - if "addresses" in record and isinstance(record["addresses"], list): for address in record["addresses"]: + zip_code = postal_code = address.get("postal_code") + if isinstance(postal_code, str) and len(postal_code) > 5: + zip_code = postal_code[:5] + payload.update({ "address1": address.get("line1"), "city": address.get("city"), "state": address.get("state"), "region": address.get("state"), - "postal": address.get("postal_code"), - "zip": address.get("postal_code"), + "postal": postal_code, + "zip": zip_code, "country": address.get("country") }) break + if "lists" in record and isinstance(record["lists"], list): + # list of ids + for list_name in record["lists"]: + if list_name in self.map_list_name_to_id: + continue + self.create_list(list_name) + + payload["lists"] = [ + self.map_list_name_to_id[l] + for l in record["lists"] + ] + if "custom_fields" in record and isinstance(record["custom_fields"], list): + payload["fields"] = { + custom_field["name"]: custom_field["value"] + for custom_field in record["custom_fields"] + } + return payload