diff --git a/app/commands/helpers/incident_helper.py b/app/commands/helpers/incident_helper.py index 4d3bc8a2..e74454f1 100644 --- a/app/commands/helpers/incident_helper.py +++ b/app/commands/helpers/incident_helper.py @@ -1,8 +1,7 @@ import json -import re import logging from integrations import google_drive -from commands.utils import get_stale_channels, log_to_sentinel +from commands.utils import get_stale_channels, log_to_sentinel, extract_google_doc_id help_text = """ \n `/sre incident create-folder ` @@ -486,18 +485,6 @@ def metadata_items(folder): ] -def extract_google_doc_id(url): - # Regular expression pattern to match Google Docs ID - pattern = r"/d/([a-zA-Z0-9_-]+)/" - - # Search in the given text for all occurences of pattern - match = re.search(pattern, url) - if match: - return match.group(1) - else: - return None - - def return_channel_name(input_str): # return the channel name without the incident- prefix and appending a # to the channel name prefix = "incident-" diff --git a/app/commands/incident.py b/app/commands/incident.py index ad0ecc21..9900a924 100644 --- a/app/commands/incident.py +++ b/app/commands/incident.py @@ -5,7 +5,17 @@ from integrations import google_drive, opsgenie from models import webhooks -from commands.utils import log_to_sentinel, get_user_locale +from commands.utils import ( + log_to_sentinel, + get_user_locale, + rearrange_by_datetime_ascending, + convert_epoch_to_datetime_est, + extract_google_doc_id, +) +from integrations.google_drive import ( + get_timeline_section, + replace_text_between_headings, +) from dotenv import load_dotenv @@ -18,6 +28,8 @@ INCIDENT_CHANNEL = os.environ.get("INCIDENT_CHANNEL") SLACK_SECURITY_USER_GROUP_ID = os.environ.get("SLACK_SECURITY_USER_GROUP_ID") +START_HEADING = "Detailed Timeline" +END_HEADING = "Trigger" def handle_incident_action_buttons(client, ack, body, logger): @@ -361,3 +373,159 @@ def generate_success_modal(body): }, ], } + + +def handle_reaction_added(client, ack, body, logger): + ack() + # get the channel in which the reaction was used + channel_id = body["event"]["item"]["channel"] + # Get the channel name which requires us to use the conversations_info API call + channel_name = client.conversations_info(channel=channel_id)["channel"]["name"] + + # if the emoji added is a floppy disk emoji and we are in an incident channel, then add the message to the incident timeline + if channel_name.startswith("incident-"): + # get the message from the conversation + try: + # get the messages from the conversation and incident channel + messages = return_messages(client, body, channel_id) + + # get the incident report document id from the incident channel + # get and update the incident document + document_id = "" + response = client.bookmarks_list(channel_id=channel_id) + if response["ok"]: + for item in range(len(response["bookmarks"])): + if response["bookmarks"][item]["title"] == "Incident report": + document_id = extract_google_doc_id( + response["bookmarks"][item]["link"] + ) + if document_id == "": + logger.error("No incident document found for this channel.") + + for message in messages: + # convert the time which is now in epoch time to standard ET Time + message_date_time = convert_epoch_to_datetime_est(message["ts"]) + # get the user name from the message + user = client.users_profile_get(user=message["user"]) + # get the full name of the user so that we include it into the timeline + user_full_name = user["profile"]["real_name"] + + # get the current timeline section content + content = get_timeline_section(document_id) + + # if the message already exists in the timeline, then don't put it there again + if content and message_date_time not in content: + # append the new message to the content + content += ( + f"{message_date_time} {user_full_name}: {message['text']}" + ) + + # if there is an image in the message, then add it to the timeline + if "files" in message: + image = message["files"][0]["url_private"] + content += f"\nImage: {image}" + + # sort all the message to be in ascending chronological order + sorted_content = rearrange_by_datetime_ascending(content) + + # replace the content in the file with the new headings + replace_text_between_headings( + document_id, sorted_content, START_HEADING, END_HEADING + ) + except Exception as e: + logger.error(e) + + +# Execute this function when a reaction was removed +def handle_reaction_removed(client, ack, body, logger): + ack() + # get the channel id + channel_id = body["event"]["item"]["channel"] + + # Get the channel name which requires us to use the conversations_info API call + result = client.conversations_info(channel=channel_id) + channel_name = result["channel"]["name"] + + if channel_name.startswith("incident-"): + try: + messages = return_messages(client, body, channel_id) + + if not messages: + logger.warning("No messages found") + return + # get the message we want to delete + message = messages[0] + + # convert the epoch time to standard ET day/time + message_date_time = convert_epoch_to_datetime_est(message["ts"]) + + # get the user of the person that send the message + user = client.users_profile_get(user=message["user"]) + # get the user's full name + user_full_name = user["profile"]["real_name"] + + # get the incident report document id from the incident channel + # get and update the incident document + document_id = "" + response = client.bookmarks_list(channel_id=channel_id) + if response["ok"]: + for item in range(len(response["bookmarks"])): + if response["bookmarks"][item]["title"] == "Incident report": + document_id = extract_google_doc_id( + response["bookmarks"][item]["link"] + ) + if document_id == "": + logger.error("No incident document found for this channel.") + + # Retrieve the current content of the timeline + content = get_timeline_section(document_id) + + # Construct the message to remove + message_to_remove = ( + f"\n{message_date_time} {user_full_name}: {message['text']}\n" + ) + # if there is a file in the message, then add it to the message to remove + if "files" in message: + image = message["files"][0]["url_private"] + message_to_remove += f"\nImage: {image}" + + # Remove the message + if message_to_remove in content: + content = content.replace(message_to_remove, "") + + # Update the timeline content + result = replace_text_between_headings( + document_id, + content, + START_HEADING, + END_HEADING, + ) + else: + logger.warning("Message not found in the timeline") + return + except Exception as e: + logger.error(e) + + +# Function to return the messages from the conversation +def return_messages(client, body, channel_id): + # Fetch the message that had the reaction removed + result = client.conversations_history( + channel=channel_id, + limit=1, + inclusive=True, + oldest=body["event"]["item"]["ts"], + ) + # get the messages + messages = result["messages"] + # if the lenght is 0, then the message is part of a thread, so get the message from the thread + if messages.__len__() == 0: + # get thread messages + result = client.conversations_replies( + channel=channel_id, + ts=body["event"]["item"]["ts"], + inclusive=True, + include_all_metadata=True, + ) + messages = result["messages"] + return messages diff --git a/app/commands/utils.py b/app/commands/utils.py index 53b95a64..d8af447d 100644 --- a/app/commands/utils.py +++ b/app/commands/utils.py @@ -2,6 +2,8 @@ import time from datetime import datetime, timedelta from integrations.sentinel import send_event +import re +import pytz logging.basicConfig(level=logging.INFO) @@ -124,3 +126,100 @@ def get_user_locale(user_id, client): if user_locale["ok"] and (user_locale["user"]["locale"] in supported_locales): return user_locale["user"]["locale"] return default_locale + + +def rearrange_by_datetime_ascending(text): + # Split the text by lines + lines = text.split("\n") + + # Temporary storage for multiline entries + entries = [] + current_entry = [] + + # Iterate over each line + for line in lines: + # Check if the line starts with a datetime format including 'ET' + if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} ET", line): + if current_entry: + # Combine the lines in current_entry and add to entries + entries.append("\n".join(current_entry)) + current_entry = [line] + else: + current_entry.append(line) + else: + # If not a datetime, it's a continuation of the previous message + current_entry.append(line) + + # Add the last entry + if current_entry: + if current_entry.__len__() > 1: + # that means we have a multiline entry + joined_current_entry = "\n".join(current_entry) + entries.append(joined_current_entry) + else: + entries.append("\n".join(current_entry)) + + # Now extract date, time, and message from each entry + dated_entries = [] + for entry in entries: + match = re.match( + r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} ET):?[\s,]*(.*)", entry, re.DOTALL + ) + if match: + date_str, msg = match.groups() + # Parse the datetime string (ignoring 'ET' for parsing) + dt = datetime.strptime(date_str[:-3].strip(), "%Y-%m-%d %H:%M:%S") + dated_entries.append((dt, msg)) + + # Sort the entries by datetime in ascending order + sorted_entries = sorted(dated_entries, key=lambda x: x[0], reverse=False) + + # Reformat the entries back into strings, including 'ET' + sorted_text = "\n".join( + [ + f"{entry[0].strftime('%Y-%m-%d %H:%M:%S')} ET {entry[1]}" + for entry in sorted_entries + ] + ) + + return sorted_text + + +def convert_epoch_to_datetime_est(epoch_time): + """ + Convert an epoch time to a standard date/time format in Eastern Standard Time (ET). + + Args: + epoch_time (float): The epoch time. + + Returns: + str: The corresponding date and time in the format YYYY-MM-DD HH:MM:SS ET. + """ + # Define the Eastern Standard Timezone + est = pytz.timezone("US/Eastern") + + # Convert epoch time to a datetime object in UTC + utc_datetime = datetime.utcfromtimestamp(float(epoch_time)) + + # Convert UTC datetime object to ET + est_datetime = utc_datetime.replace(tzinfo=pytz.utc).astimezone(est) + + # Format the datetime object to a string in the desired format with 'ET' at the end + return est_datetime.strftime("%Y-%m-%d %H:%M:%S") + " ET" + + +def extract_google_doc_id(url): + # if the url is empty or None, then log an error + if not url: + logging.error("URL is empty or None") + return None + + # Regular expression pattern to match Google Docs ID + pattern = r"https://docs.google.com/document/d/([a-zA-Z0-9_-]+)/" + + # Search in the given text for all occurences of pattern + match = re.search(pattern, url) + if match: + return match.group(1) + else: + return None diff --git a/app/integrations/google_drive.py b/app/integrations/google_drive.py index 9338eb78..11982c9e 100644 --- a/app/integrations/google_drive.py +++ b/app/integrations/google_drive.py @@ -13,6 +13,8 @@ SRE_INCIDENT_FOLDER = os.environ.get("SRE_INCIDENT_FOLDER") INCIDENT_TEMPLATE = os.environ.get("INCIDENT_TEMPLATE") INCIDENT_LIST = os.environ.get("INCIDENT_LIST") +START_HEADING = "Detailed Timeline" +END_HEADING = "Trigger" PICKLE_STRING = os.environ.get("PICKLE_STRING", False) @@ -291,6 +293,125 @@ def merge_data(file_id, name, product, slack_channel, on_call_names): return result +def get_timeline_section(document_id): + # Retrieve the document + service = get_google_service("docs", "v1") + document = service.documents().get(documentId=document_id).execute() + content = document.get("body").get("content") + print("In google, content is", content) + + timeline_content = "" + record = False + found_start = False + found_end = False + + # Iterate through the elements of the document + for element in content: + if "paragraph" in element: + paragraph_elements = element.get("paragraph").get("elements") + for elem in paragraph_elements: + text_run = elem.get("textRun") + if text_run: + text = text_run.get("content") + if START_HEADING in text: + record = True + found_start = True + elif END_HEADING in text: + found_end = True + if found_start: + return timeline_content + elif record: + timeline_content += text + + # Return None if either START_HEADING or END_HEADING not found + return None if not (found_start and found_end) else timeline_content + + +# Replace the text between the headings +def replace_text_between_headings(doc_id, new_content, start_heading, end_heading): + # Setup the service + service = get_google_service("docs", "v1") + + # Retrieve the document content + document = service.documents().get(documentId=doc_id).execute() + content = document.get("body").get("content") + + # Find the start and end indices + start_index = None + end_index = None + for element in content: + if "paragraph" in element: + paragraph = element.get("paragraph") + text_runs = paragraph.get("elements") + for text_run in text_runs: + text = text_run.get("textRun").get("content") + if start_heading in text: + # Set start_index to the end of the start heading + start_index = text_run.get("endIndex") + if end_heading in text and start_index is not None: + # Set end_index to the start of the end heading + end_index = text_run.get("startIndex") + break + + if start_index is not None and end_index is not None: + # Format new content with new lines for proper insertion + formatted_content = "\n" + new_content + "\n" + content_length = len(formatted_content) + + # Perform the replacement + requests = [ + { + "deleteContentRange": { + "range": {"startIndex": start_index, "endIndex": end_index} + } + }, + { + "insertText": { + "location": {"index": start_index}, + "text": formatted_content, + } + }, + ] + # Format the inserted text - we want to make sure that the font size is what we want + requests.append( + { + "updateTextStyle": { + "range": { + "startIndex": start_index, + "endIndex": ( + start_index + content_length + ), # Adjust this index based on the length of the text + }, + "textStyle": { + "fontSize": {"magnitude": 11, "unit": "PT"}, + "bold": False, + }, + "fields": "bold", + } + } + ) + # Update paragraph style to be normal text + requests.append( + { + "updateParagraphStyle": { + "range": { + "startIndex": start_index + 1, + "endIndex": ( + start_index + content_length + ), # Adjust this index based on the length of the text + }, + "paragraphStyle": {"namedStyleType": "NORMAL_TEXT"}, + "fields": "namedStyleType", + } + } + ) + service.documents().batchUpdate( + documentId=doc_id, body={"requests": requests} + ).execute() + else: + logging.warning("Headings not found") + + # Update the incident document with status of "Closed" def close_incident_document(file_id): # List of possible statuses to be replaced diff --git a/app/main.py b/app/main.py index 4ea26ac0..7a400e57 100644 --- a/app/main.py +++ b/app/main.py @@ -74,6 +74,18 @@ def main(bot): bot.action("reveal_webhook")(webhook_helper.reveal_webhook) bot.action("next_page")(webhook_helper.next_page) + # Handle event subscriptions when it matches floppy_disk + bot.event("reaction_added", matchers=[is_floppy_disk])( + incident.handle_reaction_added + ) + bot.event("reaction_removed", matchers=[is_floppy_disk])( + incident.handle_reaction_removed + ) + + # For all other reaction events that are not floppy_disk, just ack the event + bot.event("reaction_added")(just_ack_the_rest_of_reaction_events) + bot.event("reaction_removed")(just_ack_the_rest_of_reaction_events) + SocketModeHandler(bot, APP_TOKEN).connect() # Run scheduled tasks if not in dev @@ -95,3 +107,13 @@ def get_bot(): if bot: server_app.add_middleware(bot_middleware.BotMiddleware, bot=bot) server_app.add_event_handler("startup", partial(main, bot)) + + +# Make sure that we are listening only on floppy disk reaction +def is_floppy_disk(event: dict) -> bool: + return event["reaction"] == "floppy_disk" + + +# We need to ack all other reactions so that they don't get processed +def just_ack_the_rest_of_reaction_events(): + pass diff --git a/app/requirements.txt b/app/requirements.txt index 99769b16..09a383a1 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -16,6 +16,7 @@ PyJWT==2.8.0 PyYAML!=6.0.0,!=5.4.0,!=5.4.1 python-dotenv==0.21.1 python-i18n==0.3.9 +pytz==2023.3.post1 requests==2.31.0 schedule==1.2.1 slack-bolt==1.18.1 diff --git a/app/tests/commands/test_incident.py b/app/tests/commands/test_incident.py index c31727f5..acf5b987 100644 --- a/app/tests/commands/test_incident.py +++ b/app/tests/commands/test_incident.py @@ -853,6 +853,229 @@ def test_incident_submit_does_not_invite_security_group_members_already_in_chann ) +def test_handle_reaction_added_floppy_disk_reaction_in_incident_channel(): + logger = MagicMock() + mock_client = MagicMock() + + # Set up mock client and body to simulate the scenario + mock_client.conversations_info.return_value = {"channel": {"name": "incident-123"}} + mock_client.conversations_history.return_value = { + "messages": [{"ts": "123456", "user": "U123456"}] + } + mock_client.users_profile_get.return_value = {"profile": {"real_name": "John Doe"}} + + body = { + "event": { + "reaction": "floppy_disk", + "item": {"channel": "C123456", "ts": "123456"}, + } + } + + incident.handle_reaction_added(mock_client, lambda: None, body, logger) + + # Assert the correct API calls were made + mock_client.conversations_info.assert_called_once() + + +def test_handle_reaction_added_non_incident_channel(): + logger = MagicMock() + mock_client = MagicMock() + mock_client.conversations_info.return_value = {"channel": {"name": "general"}} + + body = { + "event": { + "reaction": "floppy_disk", + "item": {"channel": "C123456", "ts": "123456"}, + } + } + + incident.handle_reaction_added(mock_client, lambda: None, body, logger) + + # Assert that certain actions are not performed for a non-incident channel + mock_client.conversations_history.assert_not_called() + + +def test_handle_reaction_added_empty_message_list(): + logger = MagicMock() + mock_client = MagicMock() + mock_client.conversations_info.return_value = {"channel": {"name": "incident-123"}} + mock_client.conversations_history.return_value = {"messages": []} + + body = { + "event": { + "reaction": "floppy_disk", + "item": {"channel": "C123456", "ts": "123456"}, + } + } + + incident.handle_reaction_added(mock_client, lambda: None, body, logger) + + # Assert that the function tries to fetch replies when the message list is empty + mock_client.conversations_replies.assert_called_once() + + +def test_handle_reaction_added_message_in_thread(): + logger = MagicMock() + mock_client = MagicMock() + mock_client.conversations_info.return_value = {"channel": {"name": "incident-123"}} + mock_client.conversations_history.return_value = {"messages": []} + mock_client.conversations_replies.return_value = { + "messages": [{"ts": "123456", "user": "U123456"}] + } + + body = { + "event": { + "reaction": "floppy_disk", + "item": {"channel": "C123456", "ts": "123456"}, + } + } + + incident.handle_reaction_added(mock_client, lambda: None, body, logger) + + # Assert that the function fetches thread replies + mock_client.conversations_replies.assert_called_once() + + +def test_handle_reaction_added_incident_report_document_not_found(): + logger = MagicMock() + mock_client = MagicMock() + mock_client.conversations_info.return_value = {"channel": {"name": "incident-123"}} + # Simulate no incident report document found + mock_client.bookmarks_list.return_value = {"ok": True, "bookmarks": []} + + body = { + "event": { + "reaction": "floppy_disk", + "item": {"channel": "C123456", "ts": "123456"}, + } + } + + incident.handle_reaction_added(mock_client, lambda: None, body, logger) + + mock_client.users_profile_get.assert_not_called() + + +def test_handle_reaction_added_adding_new_message_to_timeline(): + logger = MagicMock() + mock_client = MagicMock() + mock_client.conversations_info.return_value = {"channel": {"name": "incident-123"}} + mock_client.conversations_history.return_value = { + "ok": True, + "messages": [ + { + "type": "message", + "user": "U123ABC456", + "text": "Sample test message", + "ts": "1512085950.000216", + } + ], + } + body = { + "event": { + "reaction": "floppy_disk", + "item": {"channel": "C123456", "ts": "123456"}, + } + } + + incident.handle_reaction_added(mock_client, lambda: None, body, logger) + + # Make assertion that the function calls the correct functions + mock_client.conversations_history.assert_called_once() + mock_client.bookmarks_list.assert_called_once() + mock_client.users_profile_get.assert_called_once() + + +def test_handle_reaction_removed_successful_message_removal(): + # Mock the client and logger + logger = MagicMock() + mock_client = MagicMock() + mock_client.conversations_info.return_value = {"channel": {"name": "incident-123"}} + mock_client.users_profile_get.return_value = {"profile": {"real_name": "John Doe"}} + mock_client.bookmarks_list.return_value = { + "ok": True, + "bookmarks": [{"title": "Incident report", "link": "http://example.com"}], + } + mock_client.get_timeline_section.return_value = "Sample test message" + mock_client.replace_text_between_headings.return_value = True + + body = { + "event": { + "reaction": "floppy_disk", + "item": {"channel": "C123456", "ts": "123456"}, + } + } + mock_client.conversations_history.return_value = { + "ok": True, + "messages": [ + { + "type": "message", + "user": "U123ABC456", + "text": "Sample test message", + "ts": "1512085950.000216", + } + ], + } + + incident.handle_reaction_removed(mock_client, lambda: None, body, logger) + mock_client.conversations_history.assert_called_once() + mock_client.bookmarks_list.assert_called_once() + mock_client.users_profile_get.assert_called_once() + + +def test_handle_reaction_removed_message_not_in_timeline(): + logger = MagicMock() + mock_client = MagicMock() + mock_client.conversations_info.return_value = {"channel": {"name": "incident-123"}} + mock_client.conversations_history.return_value = { + "messages": [{"ts": "123456", "user": "U123456"}] + } + mock_client.users_profile_get.return_value = {"profile": {"real_name": "John Doe"}} + mock_client.bookmarks_list.return_value = { + "ok": True, + "bookmarks": [{"title": "Incident report", "link": "http://example.com"}], + } + mock_client.get_timeline_section.return_value = "Some existing content" + mock_client.replace_text_between_headings.return_value = False + + body = { + "event": { + "reaction": "floppy_disk", + "item": {"channel": "C123456", "ts": "123456"}, + } + } + + assert ( + incident.handle_reaction_removed(mock_client, lambda: None, body, logger) + is None + ) + + +def test_handle_reaction_removed_non_incident_channel_reaction_removal(): + mock_client = MagicMock() + + # Mock a non-incident channel + mock_client.conversations_info.return_value = {"channel": {"name": "general"}} + + # Assert that the function does not proceed with reaction removal + mock_client.conversations_history.assert_not_called() + + +def test_handle_reaction_removed_empty_message_list_handling(): + logger = MagicMock() + mock_client = MagicMock() + mock_client.conversations_history.return_value = {"messages": []} + body = { + "event": { + "reaction": "floppy_disk", + "item": {"channel": "C123456", "ts": "123456"}, + } + } + assert ( + incident.handle_reaction_removed(mock_client, lambda: None, body, logger) + is None + ) + + def helper_options(): return [{"text": {"type": "plain_text", "text": "name"}, "value": "id"}] diff --git a/app/tests/commands/test_utils.py b/app/tests/commands/test_utils.py index 6582435e..651d5121 100644 --- a/app/tests/commands/test_utils.py +++ b/app/tests/commands/test_utils.py @@ -181,3 +181,99 @@ def test_get_user_locale_without_locale(): user_id = MagicMock() client.users_info.return_value = {"ok": False} assert utils.get_user_locale(user_id, client) == "en-US" + + +def test_basic_functionality_rearrange_by_datetime_ascending(): + input_text = "2024-01-01 10:00:00 ET Message A\n" "2024-01-02 11:00:00 ET Message B" + expected_output = ( + "2024-01-01 10:00:00 ET Message A\n" "2024-01-02 11:00:00 ET Message B" + ) + assert utils.rearrange_by_datetime_ascending(input_text) == expected_output + + +def test_multiline_entries_rearrange_by_datetime_ascending(): + input_text = ( + "2024-01-01 10:00:00 ET Message A\nContinued\n" + "2024-01-02 11:00:00 ET Message B" + ) + expected_output = ( + "2024-01-01 10:00:00 ET Message A\nContinued\n" + "2024-01-02 11:00:00 ET Message B" + ) + assert utils.rearrange_by_datetime_ascending(input_text) == expected_output + + +def test_entries_out_of_order_rearrange_by_datetime_ascending(): + input_text = "2024-01-02 11:00:00 ET Message B\n" "2024-01-01 10:00:00 ET Message A" + expected_output = ( + "2024-01-01 10:00:00 ET Message A\n" "2024-01-02 11:00:00 ET Message B" + ) + assert utils.rearrange_by_datetime_ascending(input_text) == expected_output + + +def test_invalid_entries_rearrange_by_datetime_ascending(): + input_text = "Invalid Entry\n" "2024-01-01 10:00:00 ET Message A" + expected_output = "2024-01-01 10:00:00 ET Message A" + assert utils.rearrange_by_datetime_ascending(input_text) == expected_output + + +def test_empty_input_rearrange_by_datetime_ascending(): + assert utils.rearrange_by_datetime_ascending("") == "" + + +def test_no_datetime_entries_rearrange_by_datetime_ascending(): + input_text = "Message without datetime\nAnother message" + assert utils.rearrange_by_datetime_ascending(input_text) == "" + + +def test_convert_epoch_to_datetime_est_known_epoch_time(): + # Example: 0 epoch time corresponds to 1969-12-31 19:00:00 EST + assert utils.convert_epoch_to_datetime_est(0) == "1969-12-31 19:00:00 ET" + + +def test_convert_epoch_to_datetime_est_daylight_saving_time_change(): + # Test with an epoch time known to fall in DST transition + # For example, 1583652000 corresponds to 2020-03-08 03:20:00 EST + assert utils.convert_epoch_to_datetime_est(1583652000) == "2020-03-08 03:20:00 ET" + + +def test_convert_epoch_to_datetime_est_current_epoch_time(): + time = MagicMock() + time.return_value = 1609459200 + current_est = utils.convert_epoch_to_datetime_est(time) + assert current_est == "1969-12-31 19:00:01 ET" + + +def test_convert_epoch_to_datetime_est_edge_cases(): + # Test with the epoch time at 0 + assert utils.convert_epoch_to_datetime_est(0) == "1969-12-31 19:00:00 ET" + # Test with a very large epoch time, for example + assert utils.convert_epoch_to_datetime_est(32503680000) == "2999-12-31 19:00:00 ET" + + +def test_extract_googe_doc_id_valid_google_docs_url(): + url = "https://docs.google.com/document/d/1aBcD_efGHI/edit" + assert utils.extract_google_doc_id(url) == "1aBcD_efGHI" + + +def test_extract_googe_doc_id_oogle_docs_url_with_parameters(): + url = "https://docs.google.com/document/d/1aBcD_efGHI/edit?usp=sharing" + assert utils.extract_google_doc_id(url) == "1aBcD_efGHI" + + +def test_extract_googe_doc_id_non_google_docs_url(): + url = "https://www.example.com/page/d/1aBcD_efGHI/other" + assert utils.extract_google_doc_id(url) is None + + +def test_extract_googe_doc_id_invalid_url_format(): + url = "https://docs.google.com/document/1aBcD_efGHI" + assert utils.extract_google_doc_id(url) is None + + +def test_extract_googe_doc_id_empty_string(): + assert utils.extract_google_doc_id("") is None + + +def test_extract_googe_doc_id_none_input(): + assert utils.extract_google_doc_id(None) is None diff --git a/app/tests/intergrations/test_google_drive.py b/app/tests/intergrations/test_google_drive.py index aed827a7..e17a5dea 100644 --- a/app/tests/intergrations/test_google_drive.py +++ b/app/tests/intergrations/test_google_drive.py @@ -5,6 +5,10 @@ from unittest.mock import patch +# Constants for the test +START_HEADING = "Detailed Timeline" +END_HEADING = "Trigger" + @patch("integrations.google_drive.build") @patch("integrations.google_drive.pickle") @@ -228,6 +232,344 @@ def test_update_spreadsheet(get_google_service_mock): assert google_drive.update_spreadsheet_close_incident(channel_name) is True +def create_mock_document(content): + elements = [ + { + "paragraph": { + "elements": [ + {"startIndex": 1, "endIndex": 200, "textRun": {"content": text}} + ] + } + } + for text in content + ] + return {"body": {"content": elements}} + + +@patch("integrations.google_drive.get_google_service") +def test_extract_timeline_content(mock_service): + # Mock document content + content = [START_HEADING, "Timeline content", END_HEADING] + mock_document = create_mock_document(content) + print("Mock document is ", mock_document) + mock_service.return_value.documents().get().execute.return_value = mock_document + + result = google_drive.get_timeline_section("document_id") + assert result == "Timeline content" + + +@patch("integrations.google_drive.get_google_service") +def test_extract_timeline_content_with_text_before_heading(mock_service): + # Mock document content + content = ["Some text", START_HEADING, "Timeline content", END_HEADING] + mock_document = create_mock_document(content) + mock_service.return_value.documents().get().execute.return_value = mock_document + + result = google_drive.get_timeline_section("document_id") + assert result == "Timeline content" + + +@patch("integrations.google_drive.get_google_service") +def test_extract_timeline_content_with_text_after_heading(mock_service): + # Mock document content + content = [START_HEADING, "Timeline content", END_HEADING, "Some text"] + mock_document = create_mock_document(content) + mock_service.return_value.documents().get().execute.return_value = mock_document + + result = google_drive.get_timeline_section("document_id") + assert result == "Timeline content" + + +@patch("integrations.google_drive.get_google_service") +def test_extract_timeline_content_with_text_between_heading(mock_service): + # Mock document content + content = [ + "Start of some text", + START_HEADING, + "Timeline content", + END_HEADING, + "End of some text", + ] + mock_document = create_mock_document(content) + mock_service.return_value.documents().get().execute.return_value = mock_document + + result = google_drive.get_timeline_section("document_id") + assert result == "Timeline content" + + +@patch("integrations.google_drive.get_google_service") +def test_get_timeline_section_no_headings(mock_service): + content = ["Some text", "Other text"] + mock_document = create_mock_document(content) + mock_service.return_value.documents().get().execute.return_value = mock_document + + result = google_drive.get_timeline_section("document_id") + assert result is None + + +@patch("integrations.google_drive.get_google_service") +def test_get_timeline_section_missing_start_heading(mock_service): + content = ["Some text", "Timeline content", END_HEADING, "Other text"] + mock_document = create_mock_document(content) + mock_service.return_value.documents().get().execute.return_value = mock_document + + result = google_drive.get_timeline_section("document_id") + assert result is None + + +@patch("integrations.google_drive.get_google_service") +def test_get_timeline_section_missing_end_heading(mock_service): + content = ["Some text", START_HEADING, "Timeline content", "Other text"] + mock_document = create_mock_document(content) + mock_service.return_value.documents().get().execute.return_value = mock_document + + result = google_drive.get_timeline_section("document_id") + assert result is None + + +@patch("integrations.google_drive.get_google_service") +def test_get_timeline_section_empty_document(mock_service): + mock_document = create_mock_document([]) + mock_service.return_value.documents().get().execute.return_value = mock_document + + result = google_drive.get_timeline_section("document_id") + assert result is None + + +@patch("integrations.google_drive.get_google_service") +def test_replace_text_between_headings(mock_service): + doc_id = "" + # Mock document content + mock_document = { + "body": { + "content": [ + { + "paragraph": { + "elements": [ + {"textRun": {"content": START_HEADING, "endIndex": 20}} + ] + } + }, + { + "paragraph": { + "elements": [ + { + "textRun": { + "content": "Some old content", + "endIndex": 40, + "startIndex": 20, + } + } + ] + } + }, + { + "paragraph": { + "elements": [ + {"textRun": {"content": END_HEADING, "startIndex": 40}} + ] + } + }, + ] + } + } + mock_service.return_value.documents().get().execute.return_value = mock_document + mock_service.return_value.documents().batchUpdate().execute.return_value = {} + + google_drive.replace_text_between_headings( + doc_id, mock_document, START_HEADING, END_HEADING + ) + assert mock_service.return_value.documents().batchUpdate.called + + +@patch("integrations.google_drive.get_google_service") +def test_replace_text_between_headings_more_text(mock_service): + doc_id = "" + # Mock document content + mock_document = { + "body": { + "content": [ + { + "paragraph": { + "elements": [ + { + "textRun": { + "content": "Blah blah", + "endIndex": 40, + "startIndex": 1, + } + } + ] + } + }, + { + "paragraph": { + "elements": [ + {"textRun": {"content": START_HEADING, "endIndex": 45}} + ] + } + }, + { + "paragraph": { + "elements": [ + { + "textRun": { + "content": "Some old content", + "endIndex": 60, + "startIndex": 50, + } + } + ] + } + }, + { + "paragraph": { + "elements": [ + {"textRun": {"content": END_HEADING, "startIndex": 70}} + ] + } + }, + { + "paragraph": { + "elements": [ + { + "textRun": { + "content": "Some old content", + "endIndex": 100, + "startIndex": 80, + } + } + ] + } + }, + ] + } + } + mock_service.return_value.documents().get().execute.return_value = mock_document + mock_service.return_value.documents().batchUpdate().execute.return_value = {} + + google_drive.replace_text_between_headings( + doc_id, mock_document, START_HEADING, END_HEADING + ) + assert mock_service.return_value.documents().batchUpdate.called + + +@patch("integrations.google_drive.get_google_service") +def test_replace_text_between_headings_start_heading_not_found(mock_service): + doc_id = "mock_doc_id" + + # Mock document content where start heading does not exist + mock_document = { + "body": { + "content": [ + { + "paragraph": { + "elements": [ + { + "textRun": { + "content": "Some old content", + "endIndex": 40, + "startIndex": 20, + } + } + ] + } + }, + { + "paragraph": { + "elements": [ + {"textRun": {"content": END_HEADING, "startIndex": 40}} + ] + } + }, + ] + } + } + mock_service.return_value.documents().get().execute.return_value = mock_document + + google_drive.replace_text_between_headings( + doc_id, mock_document, START_HEADING, END_HEADING + ) + + # Check if batchUpdate was not called as the start heading was not found + assert not mock_service.return_value.documents().batchUpdate.called + + +@patch("integrations.google_drive.get_google_service") +def test_replace_text_between_headings_end_heading_not_found(mock_service): + doc_id = "mock_doc_id" + + # Mock document content where start heading does not exist + mock_document = { + "body": { + "content": [ + { + "paragraph": { + "elements": [ + {"textRun": {"content": START_HEADING, "endIndex": 20}} + ] + } + }, + { + "paragraph": { + "elements": [ + { + "textRun": { + "content": "Some old content", + "endIndex": 40, + "startIndex": 20, + } + } + ] + } + }, + ] + } + } + mock_service.return_value.documents().get().execute.return_value = mock_document + + google_drive.replace_text_between_headings( + doc_id, mock_document, START_HEADING, END_HEADING + ) + + # Check if batchUpdate was not called as the start heading was not found + assert not mock_service.return_value.documents().batchUpdate.called + + +@patch("integrations.google_drive.get_google_service") +def test_replace_text_between_headings_neither_heading_not_found(mock_service): + doc_id = "mock_doc_id" + + # Mock document content where start heading does not exist + mock_document = { + "body": { + "content": [ + { + "paragraph": { + "elements": [ + { + "textRun": { + "content": "Some old content", + "endIndex": 40, + "startIndex": 20, + } + } + ] + } + }, + ] + } + } + mock_service.return_value.documents().get().execute.return_value = mock_document + + google_drive.replace_text_between_headings( + doc_id, mock_document, START_HEADING, END_HEADING + ) + + # Check if batchUpdate was not called as the start heading was not found + assert not mock_service.return_value.documents().batchUpdate.called + + @patch("integrations.google_drive.list_metadata") def test_healthcheck_healthy(mock_list_metadata): mock_list_metadata.return_value = {"id": "test_doc"} diff --git a/app/tests/test_main.py b/app/tests/test_main.py index 291e8ff3..3e9afff3 100644 --- a/app/tests/test_main.py +++ b/app/tests/test_main.py @@ -46,9 +46,28 @@ def test_main_invokes_socket_handler( mock_app.action.assert_any_call("toggle_webhook") mock_app.action.assert_any_call("reveal_webhook") + mock_app.event.assert_any_call("reaction_added") + mock_app.event.assert_any_call("reaction_removed") + mock_socket_mode_handler.assert_called_once_with( mock_app, os.environ.get("APP_TOKEN") ) mock_scheduled_tasks.init.assert_called_once_with(mock_app) mock_scheduled_tasks.run_continuously.assert_called_once_with() + + +def test_is_floppy_disk_true(): + # Test case where the reaction is 'floppy_disk' + event = {"reaction": "floppy_disk"} + assert ( + main.is_floppy_disk(event) is True + ), "The function should return True for 'floppy_disk' reaction" + + +def test_is_floppy_disk_false(): + # Test case where the reaction is not 'floppy_disk' + event = {"reaction": "thumbs_up"} + assert ( + main.is_floppy_disk(event) is False + ), "The function should return False for reactions other than 'floppy_disk'"