diff --git a/target_actionkit/client.py b/target_actionkit/client.py
index 9791bfc..2391646 100644
--- a/target_actionkit/client.py
+++ b/target_actionkit/client.py
@@ -20,6 +20,8 @@ def __init__(
     ) -> None:
         super().__init__(target, stream_name, schema, key_properties)
         self.__auth = ActionKitAuth(dict(self.config))
+        self.lists = None
+        self.initialize_lists()
 
     @property
     def base_url(self):
@@ -27,8 +29,9 @@ def base_url(self):
 
     def validate_response(self, response: requests.Response) -> None:
         """Validate HTTP response."""
+        msg = self.response_error_message(response)
         if response.status_code in [409]:
-            msg = response.reason
+            msg = f"{msg}. reason: {response.reason}"
             raise FatalAPIError(msg)
         elif response.status_code in [429] or 500 <= response.status_code < 600:
             msg = self.response_error_message(response)
@@ -47,3 +50,24 @@ def prepare_request_headers(self):
             "Accept": "application/json",
             "Authorization": self.__auth(requests.Request())
         }
+    
+    def initialize_lists(self):
+        if getattr(self, "lists"):
+            return
+        self.lists = []
+        list_url = f"list/"
+        params = "?_limit=100"
+        next_url = f"{list_url}{params}"
+
+        while next_url:
+            response = self.request_api("GET", endpoint=next_url, headers=self.prepare_request_headers())
+            response_data = response.json()
+            self.lists.extend(response_data.get("objects", []))
+            params: str = response_data.get("meta", {}).get("next", "")
+            if params:
+                params = params.split("/")[-1]
+                next_url = f"{list_url}{params}"
+            else:
+                next_url = None
+        
+        self.map_list_name_to_id = {l["name"]: l["id"] for l in self.lists}
diff --git a/target_actionkit/sinks.py b/target_actionkit/sinks.py
index 802bf78..805f7fb 100644
--- a/target_actionkit/sinks.py
+++ b/target_actionkit/sinks.py
@@ -34,11 +34,84 @@ def add_phone_numbers(self, user_id: str, record: dict):
                 request_data={"phones": existing_phones},
                 headers=self.prepare_request_headers()
             )
+    
+    def get_subscribed_lists(self, user_id):
+        subscribed_list_ids = []
+        if user_id:
+            subscribed_lists = []
+            subscriptions_url = f"subscription/"
+            params = f"?user={user_id}&_limit=100"
+            next_url = f"{subscriptions_url}{params}"
+
+            while next_url:
+                response = self.request_api("GET", endpoint=next_url, headers=self.prepare_request_headers())
+                response_data = response.json()
+                subscribed_lists.extend(response_data.get("objects", []))
+                params: str = response_data.get("meta", {}).get("next", "")
+                if params:
+                    params = params.split("/")[-1]
+                    next_url = f"{subscriptions_url}{params}"
+                else:
+                    next_url = None
+
+            for subscribed_obj in subscribed_lists:
+                list_id = subscribed_obj["list"].split("/")[-2]
+                response = self.request_api("GET", endpoint=f"list/{list_id}/", headers=self.prepare_request_headers())
+                subscribed_list_ids.append(response.json()["id"])
+
+        return subscribed_list_ids
+    
+    def add_lists(self, user_email: str, lists: list = None):
+        if lists and isinstance(lists, list):
+            self.logger.info(f"add lists to user: {user_email}. lists: {lists}")
+            return self.request_api(
+                "POST",
+                request_data={
+                    "email": user_email,
+                    "page": "signup",
+                    "lists": lists
+                },
+                endpoint="action",
+                headers=self.prepare_request_headers()
+            )
+    
+    def remove_lists(self, user_email: str, lists: list = None):
+        if lists and isinstance(lists, list):
+            self.logger.info(f"add lists to user: {user_email}. lists: {lists}")
+            return self.request_api(
+                "POST",
+                request_data={
+                    "email": user_email,
+                    "page": "unsubscribe",
+                    "lists": lists
+                },
+                endpoint="action",
+                headers=self.prepare_request_headers()
+            )
+    
+    def create_list(self, list_name: str):
+        if list_name and isinstance(list_name, str):
+            self.logger.info(f"creating list: {list_name}")
+            response = self.request_api(
+                "POST",
+                request_data={
+                    "name": list_name
+                },
+                endpoint="list",
+                headers=self.prepare_request_headers()
+            )
+            if response.ok:
+                response = self.request_api("GET", endpoint="list", params={"name": list_name}, headers=self.prepare_request_headers())
+                res_json = response.json()
+                list_id = res_json.get("objects")[0].get("id")
+                self.map_list_name_to_id[list_name] = list_id
 
     def upsert_record(self, record: dict, context: dict):
         state_dict = dict()
 
         if record.get("email"):
+            if record.get("error"):
+                raise Exception(record.get("error"))
             search_response = self.request_api(
                 "GET",
                 endpoint=f"user/?email={record['email']}",
@@ -46,9 +119,17 @@ def upsert_record(self, record: dict, context: dict):
             )
             
             existing_users = search_response.json().get("objects", [])
+            subscription_status = record.pop("subscription_status")
             
             if existing_users:
                 user_id = existing_users[0].get("id")
+                subscribed_lists = self.get_subscribed_lists(user_id)
+                lists = record.get("lists", [])
+                to_subscribe = list(set(lists) - set(subscribed_lists))
+                self.add_lists(record["email"], to_subscribe)
+                if subscription_status == "unsubscribed":
+                    to_unsubscribe = list(set(subscribed_lists) - set(lists))
+                    self.remove_lists(record["email"], to_unsubscribe)
                 response = self.request_api(
                     "PATCH",
                     request_data=record,
@@ -73,6 +154,7 @@ def upsert_record(self, record: dict, context: dict):
             state_dict["success"] = True
             id = response.headers['Location'].replace(f"https://{self.config.get('hostname')}.actionkit.com/rest/v1/user/", "")[:-1]
             self.logger.info(id)
+            self.add_lists(record["email"], record.get("lists"))
             self.add_phone_numbers(id, record)
             return id, response.ok, state_dict
         
@@ -82,20 +164,41 @@ def preprocess_record(self, record: dict, context: dict) -> dict:
         payload = {
             "first_name": record.get("first_name"),
             "last_name": record.get("last_name"),
-            "email": record.get("email")
+            "email": record.get("email"),
+            "subscription_status": record.get("subscription_status"),
         }
-
         if "addresses" in record and isinstance(record["addresses"], list):
             for address in record["addresses"]:
+                zip_code = postal_code = address.get("postal_code")
+                if isinstance(postal_code, str) and len(postal_code) > 5:
+                    zip_code = postal_code[:5]
+
                 payload.update({
                     "address1": address.get("line1"),
                     "city": address.get("city"),
                     "state": address.get("state"),
                     "region": address.get("state"),
-                    "postal": address.get("postal_code"),
-                    "zip": address.get("postal_code"),
+                    "postal": postal_code,
+                    "zip": zip_code,
                     "country": address.get("country")
                 })
                 break
 
+        if "lists" in record and isinstance(record["lists"], list):
+            # list of ids
+            for list_name in record["lists"]:
+                if list_name in self.map_list_name_to_id:
+                    continue
+                self.create_list(list_name)
+                
+            payload["lists"] = [
+                self.map_list_name_to_id[l]
+                for l in record["lists"]
+            ]
+        if "custom_fields" in record and isinstance(record["custom_fields"], list):
+            payload["fields"] = {
+                custom_field["name"]: custom_field["value"]
+                for custom_field in record["custom_fields"]
+            }
+
         return payload