Skip to content

Commit

Permalink
Merge pull request #8 from pmuller/feature/AGE_IDENTITY_FILE-environm…
Browse files Browse the repository at this point in the history
…ent-variable

feat: add support for the $AGE_IDENTITY_FILE environment variable
  • Loading branch information
pmuller authored Apr 30, 2024
2 parents b22dab5 + 19ae926 commit 0eb70b3
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 117 deletions.
65 changes: 52 additions & 13 deletions src/saltstack_age/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from collections.abc import Sequence
from getpass import getpass
from pathlib import Path
from typing import Literal

import pyrage

from saltstack_age.identities import read_identity_file
from saltstack_age.identities import get_identity_from_environment, read_identity_file
from saltstack_age.passphrase import get_passphrase_from_environment
from saltstack_age.secure_value import (
IdentitySecureValue,
Expand All @@ -18,21 +19,21 @@
LOGGER = logging.getLogger(__name__)


def normalize_identity(identity: str) -> Path:
def normalize_identity(identity: str) -> pyrage.x25519.Identity:
path = Path(identity)

if path.is_file():
return path
return read_identity_file(path)

raise ArgumentTypeError(f"Identity file does not exist: {identity}")


def parse_cli_arguments(args: Sequence[str] | None = None) -> Namespace:
parser = ArgumentParser(
description="Encrypt or decrypt secrets for use with saltstack-age renderer.",
epilog="When no passphrase or identity is provided, the tool defaults to "
"passphrase-based encryption and attempts to retrieve the passphrase from "
"the AGE_PASSPHRASE environment variable.",
epilog="When no passphrase or identity is provided, the tool tries to "
"retrieve a passphrase from the AGE_PASSPHRASE environment variable, "
"or an identity using the AGE_IDENTITY_FILE variable.",
)

type_parameters = parser.add_mutually_exclusive_group()
Expand Down Expand Up @@ -100,18 +101,52 @@ def get_passphrase(arguments: Namespace) -> str:
return passphrase


def get_identities(arguments: Namespace) -> list[pyrage.x25519.Identity]:
identities: list[pyrage.x25519.Identity] = arguments.identities or []

# When no identity is provided on the CLI, try to get one from the environment
if not identities:
identity_from_environment = get_identity_from_environment()
if identity_from_environment:
LOGGER.debug("Found identity file in environment")
identities.append(identity_from_environment)

return identities


def get_value(arguments: Namespace) -> str:
return arguments.value or sys.stdin.read()


def determine_encryption_type(
arguments: Namespace,
) -> Literal["identity", "passphrase"]:
if arguments.passphrase or arguments.passphrase_from_stdin:
return "passphrase"
if arguments.identities:
return "identity"

# We want the tool to be easy to use, so there is a lot of guesswork.
# But we also want to avoid inconsistent behaviors.
# So in case no passphrase or identity is passed to CLI,
# but both are configured in the environment, we raise an error.
identities = get_identities(arguments)
passphrase = get_passphrase(arguments)
if identities and passphrase:
LOGGER.critical("Error: Found both passphrase and identity file in environment")
raise SystemExit(-1)

if identities:
return "identity"

return "passphrase"


def encrypt(arguments: Namespace) -> None:
value = get_value(arguments).encode()

if arguments.identities:
recipients = [
read_identity_file(identity).to_public()
for identity in arguments.identities
]
if determine_encryption_type(arguments) == "identity":
recipients = [identity.to_public() for identity in get_identities(arguments)]
ciphertext = pyrage.encrypt(value, recipients)
LOGGER.info("ENC[age-identity,%s]", b64encode(ciphertext).decode())

Expand All @@ -124,15 +159,19 @@ def decrypt(arguments: Namespace) -> None:
secure_value = parse_secure_value(get_value(arguments))

if isinstance(secure_value, IdentitySecureValue):
if arguments.identities is None:
identities = get_identities(arguments)

if not identities:
LOGGER.critical("An identity is required to decrypt this value")
raise SystemExit(-1)
if len(arguments.identities) != 1:

if len(identities) != 1:
LOGGER.critical(
"A single identity must be passed to decrypt this value (got %d)",
len(arguments.identities),
)
raise SystemExit(-1)

LOGGER.info("%s", secure_value.decrypt(arguments.identities[0]))

else: # isinstance(secure_value, PassphraseSecureValue)
Expand Down
22 changes: 21 additions & 1 deletion src/saltstack_age/identities.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,34 @@
import os
import re
from pathlib import Path

import pyrage


def read_identity_file(path: Path) -> pyrage.x25519.Identity:
def read_identity_file(path: Path | str) -> pyrage.x25519.Identity:
if isinstance(path, str):
path = Path(path)

# Remove comments
identity_string = re.sub(
r"^#.*\n?",
"",
path.read_text(),
flags=re.MULTILINE,
).rstrip("\n")

return pyrage.x25519.Identity.from_str(identity_string)


def get_identity_from_environment() -> pyrage.x25519.Identity | None:
path_string = os.environ.get("AGE_IDENTITY_FILE")

if path_string is None:
return None

path = Path(path_string)

if not path.is_file():
raise FileNotFoundError(f"AGE_IDENTITY_FILE does not exist: {path}")

return read_identity_file(path)
40 changes: 31 additions & 9 deletions src/saltstack_age/renderers/age.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from collections import OrderedDict
from importlib import import_module
from pathlib import Path
from typing import Any

import pyrage
from salt.exceptions import SaltRenderError

from saltstack_age.identities import get_identity_from_environment, read_identity_file
from saltstack_age.passphrase import get_passphrase_from_environment
from saltstack_age.secure_value import (
IdentitySecureValue,
Expand All @@ -29,26 +32,45 @@ def __virtual__() -> str | tuple[bool, str]: # noqa: N807
return __virtualname__


def _decrypt(string: str) -> str:
secure_value = parse_secure_value(string)
def _get_identity() -> pyrage.x25519.Identity:
# 1. Try to get identity file from Salt configuration
identity_file_string: str | None = __salt__["config.get"]("age_identity_file")
if identity_file_string:
identity_file_path = Path(identity_file_string)

if isinstance(secure_value, IdentitySecureValue):
identity_file: str | None = __salt__["config.get"]("age_identity_file")
if not identity_file_path.is_file():
raise SaltRenderError(
f"age_identity file does not exist: {identity_file_string}"
)

if not identity_file:
raise SaltRenderError("age_identity_file is not defined")
return read_identity_file(identity_file_path)

return secure_value.decrypt(identity_file)
# 2. Try to get identity from the environment
identity = get_identity_from_environment()
if identity:
return identity

# secure_value is a PassphraseSecureValue
raise SaltRenderError("No age identity file found in config or environment")


def _get_passphrase() -> str:
passphrase: str | None = (
__salt__["config.get"]("age_passphrase") or get_passphrase_from_environment()
)

if passphrase is None:
raise SaltRenderError("No age passphrase found in config or environment")

return secure_value.decrypt(passphrase)
return passphrase


def _decrypt(string: str) -> str:
secure_value = parse_secure_value(string)

if isinstance(secure_value, IdentitySecureValue):
return secure_value.decrypt(_get_identity())

return secure_value.decrypt(_get_passphrase())


def render(
Expand Down
11 changes: 2 additions & 9 deletions src/saltstack_age/secure_value.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import re
from base64 import b64decode
from dataclasses import dataclass
from pathlib import Path

import pyrage

from saltstack_age.identities import read_identity_file

RE_SECURE_VALUE = re.compile(
r"""
ENC\[
Expand Down Expand Up @@ -42,12 +39,8 @@ def decrypt(self, passphrase: str) -> str:


class IdentitySecureValue(SecureValue):
def decrypt(self, identity: Path | str) -> str:
if isinstance(identity, str):
identity = Path(identity)
if not identity.is_file():
raise FileNotFoundError(f"Identity file does not exist: {identity}")
return pyrage.decrypt(self.ciphertext, [read_identity_file(identity)]).decode()
def decrypt(self, identity: pyrage.x25519.Identity) -> str:
return pyrage.decrypt(self.ciphertext, [identity]).decode()


def parse_secure_value(string: str) -> PassphraseSecureValue | IdentitySecureValue:
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pathlib import Path

import pytest

ROOT = Path(__file__).parent.parent
EXAMPLE_PATH = ROOT / "example"


@pytest.fixture()
def example_age_key() -> str:
return str(EXAMPLE_PATH / "config" / "age.key")
27 changes: 8 additions & 19 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import os
from pathlib import Path

import pytest
from saltfactories.cli.call import SaltCall
from saltfactories.daemons.minion import SaltMinion
from saltfactories.manager import FactoriesManager
from saltfactories.utils import random_string

ROOT = Path(__file__).parent.parent.parent
EXAMPLE_PATH = ROOT / "example"
from tests.conftest import EXAMPLE_PATH, ROOT

MINION_CONFIG = {
"file_client": "local",
"master_type": "disable",
"pillar_roots": {"base": [str(EXAMPLE_PATH / "pillar")]},
"file_roots": {"base": [str(EXAMPLE_PATH / "states")]},
}


@pytest.fixture(scope="session")
Expand All @@ -24,20 +27,6 @@ def salt_factories_config() -> dict[str, str | int | bool | None]:
}


@pytest.fixture(scope="package")
def minion(salt_factories: FactoriesManager) -> SaltMinion:
return salt_factories.salt_minion_daemon(
random_string("minion-"),
overrides={
"file_client": "local",
"master_type": "disable",
"pillar_roots": {"base": [str(EXAMPLE_PATH / "pillar")]},
"file_roots": {"base": [str(EXAMPLE_PATH / "states")]},
"age_identity_file": str(EXAMPLE_PATH / "config" / "age.key"),
},
)


@pytest.fixture()
def salt_call_cli(minion: SaltMinion) -> SaltCall:
return minion.salt_call_cli()
9 changes: 0 additions & 9 deletions tests/integration/test_age.py

This file was deleted.

12 changes: 8 additions & 4 deletions tests/integration/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pyrage
import pytest
from saltstack_age.cli import main
from saltstack_age.identities import read_identity_file
from saltstack_age.secure_value import (
IdentitySecureValue,
PassphraseSecureValue,
Expand All @@ -25,17 +26,20 @@ def test_encrypt__passphrase(caplog: pytest.LogCaptureFixture) -> None:
assert secure_value.decrypt("woah that is so secret") == "another secret"


def test_encrypt__single_recipient(caplog: pytest.LogCaptureFixture) -> None:
def test_encrypt__single_recipient(
caplog: pytest.LogCaptureFixture,
example_age_key: str,
) -> None:
# Only keep INFO log records
caplog.set_level(logging.INFO)
# Run the CLI tool
main(["-i", "example/config/age.key", "enc", "foo"])
main(["-i", example_age_key, "enc", "foo"])
# Ensure we get an identity secure value string
secure_value_string = caplog.record_tuples[0][2]
secure_value = parse_secure_value(secure_value_string)
assert isinstance(secure_value, IdentitySecureValue)
# Ensure we can decrypt it using the same identity
assert secure_value.decrypt("example/config/age.key") == "foo"
assert secure_value.decrypt(read_identity_file(example_age_key)) == "foo"


def test_encrypt__multiple_recipients(
Expand Down Expand Up @@ -67,7 +71,7 @@ def test_encrypt__multiple_recipients(
assert isinstance(secure_value, IdentitySecureValue)
# Ensure we can decrypt it using all the recipient identities
for identity_path in (identity1_path, identity2_path):
assert secure_value.decrypt(identity_path) == "foo"
assert secure_value.decrypt(read_identity_file(identity_path)) == "foo"


@pytest.mark.parametrize(
Expand Down
25 changes: 25 additions & 0 deletions tests/integration/test_renderer_identity_from_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from pathlib import Path

import pytest
from saltfactories.cli.call import SaltCall
from saltfactories.daemons.minion import SaltMinion
from saltfactories.manager import FactoriesManager
from saltfactories.utils import random_string

from tests.integration.conftest import MINION_CONFIG


@pytest.fixture()
def minion(salt_factories: FactoriesManager, example_age_key: str) -> SaltMinion:
overrides = MINION_CONFIG.copy()
overrides["age_identity_file"] = example_age_key
return salt_factories.salt_minion_daemon(
random_string("minion-"),
overrides=overrides,
)


def test(salt_call_cli: SaltCall, tmp_path: Path) -> None:
_ = salt_call_cli.run("state.apply", pillar=f'{{"prefix": "{tmp_path}"}}')
assert (tmp_path / "test-public").read_text() == "that's not a secret\n"
assert (tmp_path / "test-private").read_text() == "test-secret-value\n"
Loading

0 comments on commit 0eb70b3

Please sign in to comment.