From 7f0a270e5a2b4ecde6ee758b55d735f6a7df7077 Mon Sep 17 00:00:00 2001 From: naisanzaa Date: Mon, 2 Sep 2024 16:03:45 -0700 Subject: [PATCH] keepass_to_pass: add integration --- README.md | 2 +- .../integrations/keepass_to_pass/__init__.py | 4 + .../integrations/keepass_to_pass/client.py | 80 +++++++++++++++++++ .../keepass_to_pass/tests/__init__.py | 0 .../keepass_to_pass/tests/test_keepass.py | 23 ++++++ 5 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 automon/integrations/keepass_to_pass/__init__.py create mode 100644 automon/integrations/keepass_to_pass/client.py create mode 100644 automon/integrations/keepass_to_pass/tests/__init__.py create mode 100644 automon/integrations/keepass_to_pass/tests/test_keepass.py diff --git a/README.md b/README.md index 48e7bd73..eb437dde 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ Github issues and feature requests welcomed. | API | flask | | Chat | slack | | Data Scraping | beautifulsoup
facebook groups
instagram
scrapy | -| Databases | elasticsearch
neo4j
splunk | +| Databases | elasticsearch
neo4j
splunk
pass | | Data Store | minio
swift | | Devices | snmp | | Google Cloud | google auth api
google people api
google sheets api | diff --git a/automon/integrations/keepass_to_pass/__init__.py b/automon/integrations/keepass_to_pass/__init__.py new file mode 100644 index 00000000..373397f0 --- /dev/null +++ b/automon/integrations/keepass_to_pass/__init__.py @@ -0,0 +1,4 @@ +from .client import ( + KeepassClient, + PassClient +) diff --git a/automon/integrations/keepass_to_pass/client.py b/automon/integrations/keepass_to_pass/client.py new file mode 100644 index 00000000..76251429 --- /dev/null +++ b/automon/integrations/keepass_to_pass/client.py @@ -0,0 +1,80 @@ +import os +import csv + +from automon import log +from automon.helpers import Run + +logger = log.logging.getLogger(__name__) +logger.setLevel(level=log.DEBUG) + + +class KeepassClient(object): + + def __init__(self): + self.database_csv_path = None + + def read_database_csv(self, database_csv_path: str, delimiter: str = ',', quotechar: str = '"') -> csv.reader: + if self.set_database_csv_path(database_csv_path=database_csv_path): + logger.debug(f'KeepassClient :: read_database_csv :: read') + with open(self.database_csv_path, 'r') as csvfile: + return [KeepassEntry(pw) for pw in csv.reader(csvfile, delimiter=delimiter, quotechar=quotechar)][1:] + + raise OSError(f'KeepassClient :: read_database_csv :: ERROR :: {database_csv_path}') + + def set_database_csv_path(self, database_csv_path: str) -> True: + if os.path.exists(database_csv_path): + self.database_csv_path = database_csv_path + logger.debug( + f'KeepassClient :: set_database_csv_path :: {database_csv_path} :: {os.stat(database_csv_path)}') + return True + + raise FileNotFoundError(f'KeepassClient :: set_database_csv_path :: ERROR :: {database_csv_path}') + + +class KeepassEntry(object): + ENTRY_FORMAT: dict = { + 'Group': 0, + 'Title': 1, + 'Username': 2, + 'Password': 3, + 'URL': 4, + 'Notes': 5, + 'TOPT': 6, + 'Icon': 7, + 'Last Modified': 8, + 'Created': 9, + } + + def __init__(self, cvs_row: list): + self.Group: str = cvs_row[self.ENTRY_FORMAT['Group']] + self.Title: str = cvs_row[self.ENTRY_FORMAT['Title']] + self.Username: str = cvs_row[self.ENTRY_FORMAT['Username']] + self.Password: str = cvs_row[self.ENTRY_FORMAT['Password']] + self.URL: str = cvs_row[self.ENTRY_FORMAT['URL']] + self.Notes: str = cvs_row[self.ENTRY_FORMAT['Notes']] + self.TOPT: str = cvs_row[self.ENTRY_FORMAT['TOPT']] + self.Icon: str = cvs_row[self.ENTRY_FORMAT['Icon']] + self.LastModified: str = cvs_row[self.ENTRY_FORMAT['Last Modified']] + self.Created: str = cvs_row[self.ENTRY_FORMAT['Created']] + + def __str__(self): + return f'{self.Group} :: {self.Title} :: {self.Username} :: {self.URL}' + + def to_pass(self): + return f"""{self.Password} +Title: {self.Title} +Username: {self.Username} +URL: {self.URL} +TOPT: {self.TOPT} +Icon: {self.Icon} +Last Modified: {self.LastModified} +Created: {self.Created} +Notes: {self.Notes} +""" + + +class PassClient(object): + TEMPLATE: str = f'' + + def __init__(self): + self._runner = Run() diff --git a/automon/integrations/keepass_to_pass/tests/__init__.py b/automon/integrations/keepass_to_pass/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/automon/integrations/keepass_to_pass/tests/test_keepass.py b/automon/integrations/keepass_to_pass/tests/test_keepass.py new file mode 100644 index 00000000..d6c78f8d --- /dev/null +++ b/automon/integrations/keepass_to_pass/tests/test_keepass.py @@ -0,0 +1,23 @@ +import unittest + +from automon.integrations.keepass_to_pass import KeepassClient + +test = KeepassClient() + + +class TestKeepassClient(unittest.TestCase): + def test_set_database_csv_path(self): + with self.assertRaises(FileNotFoundError): + test.set_database_csv_path('AAAA.XXXCSV') + + open('BBBB.XXXCSV', 'w').close() + + self.assertTrue(test.set_database_csv_path('BBBB.XXXCSV')) + + def test_read_database_csv(self): + with self.assertRaises(OSError): + test.read_database_csv('AAAA.XXXCSV') + + +if __name__ == '__main__': + unittest.main()