Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Secure OS Storage Option #624

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import base64
import json
import logging
import os
import platform
import subprocess

from keeper_secrets_manager_core import exceptions
from keeper_secrets_manager_core.configkeys import ConfigKeys
from keeper_secrets_manager_core.storage import KeyValueStorage
from keeper_secrets_manager_core.keeper_globals import logger_name
from keeper_secrets_manager_core.utils import base64_to_string, json_to_dict


class SecureOSStorage(KeyValueStorage):
"""Secure OS based implementation of the key value storage

Uses either the Windows Credential Manager, Linux Keyring or macOS Keychain to store
the config. The config is stored as a base64 encoded string.
"""
def __init__(self, app_name, exec_path):
if not app_name:
logging.getLogger(logger_name).error("An application name is required for SecureOSStorage")
raise exceptions.KeeperError("An application name is required for SecureOSStorage")

self.app_name = app_name
self._machine_os = platform.system()

if not exec_path:
self._exec_path = self._find_exe_path()
if not self._exec_path:
logging.getLogger(logger_name).error("Could not find secure config executable")
raise exceptions.KeeperError("Could not find secure config executable")
else:
self._exec_path = exec_path

self.config = {}

def _find_exe_path(self) -> str | None:
if path := os.getenv("KSM_CONFIG_EXE_PATH"):
return path

if self._machine_os == "Windows":
return self._run_command(["powershell", "-command", "(Get-Command wcm).Source"])
elif self._machine_os == "Linux":
return self._run_command(["which", "lku"])

def _run_command(self, args: list[str | list]) -> str:
"""Run a command and return the output of stdout."""

# Flatten args list in instance that it has nested lists
args_list = [item for arg in args for item in (arg if isinstance(arg, list) else [arg])]

try:
completed_process = subprocess.run(args_list, capture_output=True, check=True)
if completed_process.stdout:
return completed_process.stdout.decode().strip()
else:
# Some commands do not return anything to stdout on success, such as the 'set' command.
if completed_process.returncode == 0:
return ""
else:
logging.getLogger(logger_name).error(
f"Failed to run command: {args_list}, which returned {completed_process.stderr}"
)
raise exceptions.KeeperError(f"Command: {args_list} returned empty stdout")

except subprocess.CalledProcessError:
logging.getLogger(logger_name).error(f"Failed to run command: {args_list}")
raise exceptions.KeeperError(f"Failed to run command: {args_list}")

def read_storage(self) -> dict:
result = self._run_command([self._exec_path, "get", self.app_name])
if not result:
logging.getLogger(logger_name).error("Failed to read config or config does not exist")
return self.config

config = json_to_dict(base64_to_string(result))
for key in config:
self.config[ConfigKeys.get_enum(key)] = config[key]
return self.config

def save_storage(self) -> None:
# Convert current self.config to base64 and save it
b64_config = base64.b64encode(json.dumps(self.config).encode())
result = self._run_command([self._exec_path, "set", self.app_name, b64_config])
if result == "":
logging.getLogger(logger_name).info("Config saved successfully")

def get(self, key: ConfigKeys):
return self.config.get(key)

def set(self, key: ConfigKeys, value):
self.config[key] = value

def delete(self, key: ConfigKeys):
self.config.pop(key, None)

def delete_all(self):
self.config = {}

def contains(self, key: ConfigKeys):
return key in self.config
23 changes: 23 additions & 0 deletions sdk/python/storage/tests/mock_secure_exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# _ __
# | |/ /___ ___ _ __ ___ _ _ (R)
# | ' </ -_) -_) '_ \/ -_) '_|
# |_|\_\___\___| .__/\___|_|
# |_|
#
# Keeper Secrets Manager
# Copyright 2024 Keeper Security Inc.
# Contact: [email protected]

import sys
from keeper_secrets_manager_core.mock import MockConfig


if __name__ == "__main__":
args = sys.argv[1:]

if args[0] == "get":
config = MockConfig().make_base64()
print(config)

exit(0)
68 changes: 68 additions & 0 deletions sdk/python/storage/tests/storage_secure_os_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import sys
import unittest
import os

from keeper_secrets_manager_core.configkeys import ConfigKeys
from keeper_secrets_manager_core.mock import MockConfig
from keeper_secrets_manager_storage.storage_secure_os import SecureOSStorage


class SecureOSTest(unittest.TestCase):
def setUp(self):
self.orig_working_dir = os.getcwd()

# Create mock secure storage executable
self.mock_exec_path = os.path.join(
self.orig_working_dir, "tests", "mock_secure_exec.py"
)

# sys.executable returns the path of the current python interpreter
# which is used to run the mock_secure_exec.py file
self.python_interpreter = sys.executable

def tearDown(self):
os.chdir(self.orig_working_dir)

def test_secure_os_storage(self):
mock_config = MockConfig.make_config()
storage = SecureOSStorage(app_name="TEST", exec_path="test.exe")

# test set() and get()
storage.set(ConfigKeys.KEY_CLIENT_ID, mock_config.get("clientId"))
storage.set(ConfigKeys.KEY_APP_KEY, mock_config.get("appKey"))
storage.set(ConfigKeys.KEY_PRIVATE_KEY, mock_config.get("privateKey"))
self.assertEqual(
mock_config.get("clientId"), storage.get(ConfigKeys.KEY_CLIENT_ID)
)
self.assertEqual(mock_config.get("appKey"), storage.get(ConfigKeys.KEY_APP_KEY))
self.assertEqual(
mock_config.get("privateKey"), storage.get(ConfigKeys.KEY_PRIVATE_KEY)
)

# test contains()
self.assertTrue(storage.contains(ConfigKeys.KEY_CLIENT_ID))

# test delete()
storage.delete(ConfigKeys.KEY_CLIENT_ID)
self.assertIsNone(storage.get(ConfigKeys.KEY_CLIENT_ID))

# test delete_all()
storage.delete_all()
self.assertIsNone(storage.get(ConfigKeys.KEY_APP_KEY))

def test_secure_os_storage_read_storage(self):
storage = SecureOSStorage(
app_name="TEST", exec_path=[self.python_interpreter, self.mock_exec_path]
)

storage.read_storage()
self.assertIsNotNone(storage.get(ConfigKeys.KEY_CLIENT_ID))

def test_secure_os_storage_save_storage(self):
storage = SecureOSStorage(
app_name="TEST", exec_path=[self.python_interpreter, self.mock_exec_path]
)
storage.config = MockConfig.make_config()

# Test save_storage() doesn't raise an exception
storage.save_storage()