Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Secure OS Storage Option #624

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions sdk/python/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,55 @@ The Keeper Secrets Manager Storage module for working with custom key-value stor

For more information see our official documentation page https://docs.keeper.io/secrets-manager/secrets-manager

# Change Log
## Secure OS Storage

## 1.0.1
The `SecureOSStorage` interface makes use of the `windows-credential-manager-utility` or the `linux-keyring-utility` to interact with the operating systems native secrets platform (i.e. Windows Credential Manager or Linux Keyring) to secure Keeper configuration files.

### Usage

To use the secure storage, create an instance of the `SecureOSStorage` class and pass the name of the application that will be using the storage. This name will be used to identify the applications configuration in the OS's storage platform.

```python
from keeper_secrets_manager_storage import SecureOSStorage

storage = SecureOSStorage(app_name='my_app')
```

By default, `SecureOSStorage` will check `PATH` for one of the above utilities.

By setting the environment variable `KSM_CONFIG_EXE_PATH` to the path of the utility, users have the option to run the executable directly without it being in `PATH`.

In addition, users can pass the path to the utility executable as an argument to the `SecureOSStorage` constructor.

```python
storage = SecureOSStorage(app_name='my_app', exe_path='path/to/utility')
```

Should the executable need to be run as a different user or as with elevated priviledges, the `SecureOSStorage` constructor accepts an optional `run_as` parameter. This value will be prepended to the command that is run to interact with the OS's storage platform.

```python
storage = SecureOSStorage(app_name='my_app', exe_path='path/to/utility', run_as='sudo')
```

You can retrieve the current configuration of an application by calling `read_storage()` on the storage object.

```python
config = storage.read_storage()
```

Save a configuration by calling `save_storage()` on the storage object.

```python
storage.save_storage()
```

> Note: Saving a configuration will overwrite an existing configuration with the same application name without warning.

## Change Log

### 1.0.1

- Added new storage type storage type for AWS Secrets Manager

## 1.0.0
### 1.0.0
- Initial release
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import base64
import hashlib
import json
import logging
import os
import platform
import subprocess
from enum import Enum

from keeper_secrets_manager_core import exceptions
from keeper_secrets_manager_core.configkeys import ConfigKeys
from keeper_secrets_manager_core.keeper_globals import logger_name
from keeper_secrets_manager_core.storage import KeyValueStorage
from keeper_secrets_manager_core.utils import base64_to_string, json_to_dict


class LKUChecksums(Enum):
"""Checksums for the Linux Keyring Utility (lku)"""

V0_1_0 = "3B8AB72D5BE95B4FDD3D56A5ECD6C75EF121CCC36520341B61B8E6DEDBFB5128"
V0_1_1 = "5C9848AAB7ABCC1842C941D6EB42A55E0C2AD140E5D8F94CA798DF1B336ECFDF"


class WCMChecksums(Enum):
"""Checksums for the Windows Credential Manager Utility (wcm)"""

V0_1_0 = "50A431188DDBFA7D963304D6ED3B0C6D0B68A0B0703DE0D96C2BB4D0FB2F77F4"
V0_2_0 = "A166E71F02FE51B5AA132E8664EF4A8922F42AA889E0962DCE5F7ABAD5DCDA0A"
V0_2_1 = "8EAEB30AE5DEC8F1C3D957C3BC0433D8F18FCC03E5C761A5C1A6C7AE41264105"


def is_valid_checksum(file: str, checksums) -> bool:
with open(file, "rb") as f:
file_hash = hashlib.file_digest(f, "sha256")

for checksum in checksums:
if file_hash.hexdigest().upper() == checksum.value:
return True
return False


class SecureOSStorage(KeyValueStorage):
"""Secure OS based implementation of the key value storage

Uses either the Windows Credential Manager, Linux Keyring or macOS Keychain to store
the config. The config is stored as a base64 encoded string.
"""

def __init__(
self,
app_name: str,
exec_path: str,
run_as: str = None,
_lku_checksums=LKUChecksums,
_wcm_checksums=WCMChecksums,
):
if not app_name:
logging.getLogger(logger_name).error(
"An application name is required for SecureOSStorage"
)
raise exceptions.KeeperError(
"An application name is required for SecureOSStorage"
)

self.app_name = app_name
self.lku_checksums = _lku_checksums
self.wcm_checksums = _wcm_checksums
self._run_as = run_as
self._machine_os = platform.system()

if not exec_path:
self._exec_path = self._find_exe_path()
if not self._exec_path:
logging.getLogger(logger_name).error(
"Could not find secure config executable"
)
raise exceptions.KeeperError("Could not find secure config executable")
else:
self._exec_path = exec_path

self.config = {}

def _find_exe_path(self) -> str | None:
if path := os.getenv("KSM_CONFIG_EXE_PATH"):
return path

if self._machine_os == "Windows":
return self._run_command(
["powershell", "-command", "(Get-Command wcm).Source"]
)
elif self._machine_os == "Linux":
return self._run_command(["which", "lku"])

def _run_command(self, args: list[str]) -> str:
"""Run a command and return the output of stdout."""

# Check if the checksum of the executable is valid every time it is called, as
# self._exec_path could be changed during the lifetime of the object.
if self._machine_os == "Windows":
valid = is_valid_checksum(self._exec_path, self.wcm_checksums)
elif self._machine_os == "Linux":
valid = is_valid_checksum(self._exec_path, self.lku_checksums)
else:
valid = False

if not valid:
logging.getLogger(logger_name).error(
f"Checksum for {self._exec_path} is invalid"
)
raise exceptions.KeeperError(f"Checksum for {self._exec_path} is invalid")

# Insert the run_as command at the beginning of the args list if it exists
if self._run_as:
args.insert(0, self._run_as)

try:
completed_process = subprocess.run(args, capture_output=True, check=True)
if completed_process.stdout:
return completed_process.stdout.decode().strip()
else:
# Some commands do not return anything to stdout on success, such as the 'set' command.
if completed_process.returncode == 0:
return ""
else:
logging.getLogger(logger_name).error(
f"Failed to run command: {args}, which returned {completed_process.stderr}"
)
raise exceptions.KeeperError(
f"Command: {args} returned empty stdout"
)

except subprocess.CalledProcessError:
logging.getLogger(logger_name).error(f"Failed to run command: {args}")
raise exceptions.KeeperError(f"Failed to run command: {args}")

def read_storage(self) -> dict:
result = self._run_command([self._exec_path, "get", self.app_name])
if not result:
logging.getLogger(logger_name).error(
"Failed to read config or config does not exist"
)
return self.config

config = json_to_dict(base64_to_string(result))
for key in config:
self.config[ConfigKeys.get_enum(key)] = config[key]
return self.config

def save_storage(self) -> None:
# Convert current self.config to base64 and save it
b64_config = base64.b64encode(json.dumps(self.config).encode())
result = self._run_command([self._exec_path, "set", self.app_name, b64_config])
if result == "":
logging.getLogger(logger_name).info("Config saved successfully")

def get(self, key: ConfigKeys):
return self.config.get(key)

def set(self, key: ConfigKeys, value):
self.config[key] = value

def delete(self, key: ConfigKeys):
self.config.pop(key, None)

def delete_all(self):
self.config = {}

def contains(self, key: ConfigKeys):
return key in self.config
23 changes: 23 additions & 0 deletions sdk/python/storage/tests/mock_secure_exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# _ __
# | |/ /___ ___ _ __ ___ _ _ (R)
# | ' </ -_) -_) '_ \/ -_) '_|
# |_|\_\___\___| .__/\___|_|
# |_|
#
# Keeper Secrets Manager
# Copyright 2024 Keeper Security Inc.
# Contact: [email protected]

import sys

from keeper_secrets_manager_core.mock import MockConfig

if __name__ == "__main__":
args = sys.argv[1:]

if args[0] == "get":
config = MockConfig().make_base64()
print(config)

exit(0)
104 changes: 104 additions & 0 deletions sdk/python/storage/tests/storage_secure_os_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
import sys
import tempfile
import unittest
from enum import Enum

from keeper_secrets_manager_core.configkeys import ConfigKeys
from keeper_secrets_manager_core.mock import MockConfig

from keeper_secrets_manager_storage.storage_secure_os import (
SecureOSStorage,
is_valid_checksum,
)


class MockChecksums(Enum):
MOCK_EXEC_PATH_CHECKSUM = (
"712B227DDF2C13F218D217428A10B892B0D66201696C17331A808D18A52AD70F"
)


class SecureOSTest(unittest.TestCase):
def setUp(self):
self.orig_working_dir = os.getcwd()

# Create mock secure storage executable
self.mock_exec_path = os.path.join(
self.orig_working_dir, "tests", "mock_secure_exec.py"
)

# sys.executable returns the path of the current python interpreter
# which is used to run the mock_secure_exec.py file
self.python_interpreter = sys.executable

# Create a temporary directory to store temp files
self.test_dir = tempfile.TemporaryDirectory()

def tearDown(self):
# WCMChecksums = self.original_wcm_checksums
# LKUChecksums = self.original_lku_checksums
self.test_dir.cleanup()

def test_secure_os_storage(self):
mock_config = MockConfig.make_config()
storage = SecureOSStorage(app_name="TEST", exec_path="test.exe")

# test set() and get()
storage.set(ConfigKeys.KEY_CLIENT_ID, mock_config.get("clientId"))
storage.set(ConfigKeys.KEY_APP_KEY, mock_config.get("appKey"))
storage.set(ConfigKeys.KEY_PRIVATE_KEY, mock_config.get("privateKey"))
self.assertEqual(
mock_config.get("clientId"), storage.get(ConfigKeys.KEY_CLIENT_ID)
)
self.assertEqual(mock_config.get("appKey"), storage.get(ConfigKeys.KEY_APP_KEY))
self.assertEqual(
mock_config.get("privateKey"), storage.get(ConfigKeys.KEY_PRIVATE_KEY)
)

# test contains()
self.assertTrue(storage.contains(ConfigKeys.KEY_CLIENT_ID))

# test delete()
storage.delete(ConfigKeys.KEY_CLIENT_ID)
self.assertIsNone(storage.get(ConfigKeys.KEY_CLIENT_ID))

# test delete_all()
storage.delete_all()
self.assertIsNone(storage.get(ConfigKeys.KEY_APP_KEY))

def test_secure_os_storage_read_storage(self):
storage = SecureOSStorage(
app_name="TEST",
exec_path=self.mock_exec_path,
run_as=self.python_interpreter,
_lku_checksums=MockChecksums,
_wcm_checksums=MockChecksums,
)

storage.read_storage()
self.assertIsNotNone(storage.get(ConfigKeys.KEY_CLIENT_ID))

def test_secure_os_storage_save_storage(self):
storage = SecureOSStorage(
app_name="TEST",
exec_path=self.mock_exec_path,
run_as=self.python_interpreter,
_lku_checksums=MockChecksums,
_wcm_checksums=MockChecksums,
)
storage.config = MockConfig.make_config()

# Test save_storage() doesn't raise an exception
storage.save_storage()

def test_is_valid_checksum(self):
# Test valid checksum
self.assertTrue(is_valid_checksum(self.mock_exec_path, MockChecksums))

# Test invalid checksum
file_path = os.path.join(self.test_dir.name, "invalid.txt")
with open(file_path, "w") as f:
f.write("Invalid checksum")

self.assertFalse(is_valid_checksum(file_path, MockChecksums))