Skip to content

Commit

Permalink
coord: Add support for FilesystemSecretsController (MaterializeInc#11628
Browse files Browse the repository at this point in the history
)

Continuation of the SECRETS work as defined in its design.

This PR adds a configurable Local Filesystem secrets controller. The controller stores the contents of the secret on disk inside of the mzdata/secrets directory. Each secret is stored in a file that matches its GlobalId.

The user can select which secret controller they want to use via --secrets-controller the two options are local-file-system and kubernetes. The default is the FilesystemSecretsController.

This PR also adds the necessary files for the Kubernetes controller, yet the controller does nothing.

Motivation
This PR adds a known-desirable feature.

Co-authored-by: Philip Stoev <[email protected]>
  • Loading branch information
Martin Kysel and philip-stoev authored Apr 12, 2022
1 parent 6f3f8a8 commit 6dda6aa
Show file tree
Hide file tree
Showing 20 changed files with 399 additions and 10 deletions.
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ members = [
"src/repr-test-util",
"src/repr",
"src/s3-datagen",
"src/secrets",
"src/secrets-filesystem",
"src/secrets-kubernetes",
"src/sql-parser",
"src/sql",
"src/sqllogictest",
Expand Down
8 changes: 8 additions & 0 deletions ci/nightly/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ steps:
- { value: feature-benchmark-persistence }
- { value: aws-config }
- { value: zippy }
- { value: secrets }
- { value: persistence-failpoints }
- { value: catalog-compat }
multiple: true
Expand Down Expand Up @@ -209,6 +210,13 @@ steps:
- ./ci/plugins/mzcompose:
composition: zippy

- id: secrets
label: "Secrets"
timeout_in_minutes: 30
plugins:
- ./ci/plugins/mzcompose:
composition: secrets

- id: persistence-failpoints
label: Persistence failpoints
depends_on: build-x86_64
Expand Down
8 changes: 7 additions & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,12 +601,18 @@ def exec(
stdin: read STDIN from a string.
"""

entrypoint = (
self.compose["services"][service]["entrypoint"]
if "entrypoint" in self.compose["services"][service]
else None
)

return self.invoke(
"exec",
*(["--detach"] if detach else []),
"-T",
service,
*self.compose["services"][service]["entrypoint"],
*([entrypoint] if entrypoint else []),
*args,
stdin=stdin,
)
Expand Down
1 change: 1 addition & 0 deletions src/coord/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mz-stash = { path = "../stash" }
mz-sql = { path = "../sql" }
mz-sql-parser = { path = "../sql-parser" }
mz-transform = { path = "../transform" }
mz-secrets = { path = "../secrets"}
postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres", branch = "mz-0.7.2" }
prometheus = { version = "0.13.0", default-features = false }
prost = "0.9.0"
Expand Down
58 changes: 52 additions & 6 deletions src/coord/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ use derivative::Derivative;
use differential_dataflow::lattice::Lattice;
use futures::future::{FutureExt, TryFutureExt};
use futures::stream::StreamExt;
use itertools::Itertools;
use rand::Rng;
use timely::order::PartialOrder;
use timely::progress::frontier::MutableAntichain;
Expand All @@ -104,8 +105,8 @@ use mz_dataflow_types::{
Update,
};
use mz_expr::{
permutation_for_arrangement, CollectionPlan, EvalError, ExprHumanizer, GlobalId,
MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
permutation_for_arrangement, CollectionPlan, ExprHumanizer, GlobalId, MirRelationExpr,
MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
};
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::{to_datetime, EpochMillis, NowFn};
Expand All @@ -116,6 +117,7 @@ use mz_ore::thread::JoinHandleExt;
use mz_repr::adt::interval::Interval;
use mz_repr::adt::numeric::{Numeric, NumericMaxScale};
use mz_repr::{Datum, Diff, RelationDesc, RelationType, Row, RowArena, ScalarType, Timestamp};
use mz_secrets::{SecretOp, SecretsController};
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::{
CreateIndexStatement, CreateSinkStatement, CreateSourceStatement, ExplainStage, FetchStatement,
Expand Down Expand Up @@ -257,6 +259,7 @@ pub struct Config {
pub metrics_registry: MetricsRegistry,
pub persister: PersisterWithConfig,
pub now: NowFn,
pub secrets_controller: Box<dyn SecretsController>,
}

struct PendingPeek {
Expand Down Expand Up @@ -328,6 +331,10 @@ pub struct Coordinator {
write_lock: Arc<tokio::sync::Mutex<()>>,
/// Holds plans deferred due to write lock.
write_lock_wait_group: VecDeque<DeferredPlan>,

/// Handle to secret manager that can create and delete secrets from
/// an arbitrary secret storage engine.
secrets_controller: Box<dyn SecretsController>,
}

/// Metadata about an active connection.
Expand Down Expand Up @@ -2118,18 +2125,22 @@ impl Coordinator {
let evaled = secret.secret_as.eval(&[], &temp_storage)?;

if evaled == Datum::Null {
return Err(CoordError::Eval(EvalError::NullCharacterNotPermitted));
coord_bail!("secret value can not be null");
}

// TODO martin: hook the payload into a secrets backend
let _payload = evaled.unwrap_bytes();
let payload = evaled.unwrap_bytes();

let id = self.catalog.allocate_user_id()?;
let oid = self.catalog.allocate_oid()?;
let secret = catalog::Secret {
create_sql: format!("CREATE SECRET {} AS '********'", full_name),
};

self.secrets_controller.apply(vec![SecretOp::Ensure {
id,
contents: Vec::from(payload),
}])?;

let ops = vec![catalog::Op::CreateItem {
id,
oid,
Expand All @@ -2143,7 +2154,18 @@ impl Coordinator {
kind: catalog::ErrorKind::ItemAlreadyExists(_),
..
})) if if_not_exists => Ok(ExecuteResponse::CreatedSecret { existed: true }),
Err(err) => Err(err),
Err(err) => {
match self.secrets_controller.apply(vec![SecretOp::Delete { id }]) {
Ok(_) => {}
Err(e) => {
warn!(
"Dropping newly created secrets has encountered an error: {}",
e
);
}
}
Err(err)
}
}
}

Expand Down Expand Up @@ -4353,6 +4375,7 @@ impl Coordinator {
let mut sinks_to_drop = vec![];
let mut indexes_to_drop = vec![];
let mut replication_slots_to_drop: HashMap<String, Vec<String>> = HashMap::new();
let mut secrets_to_drop = vec![];

for op in &ops {
if let catalog::Op::DropItem(id) = op {
Expand Down Expand Up @@ -4390,6 +4413,9 @@ impl Coordinator {
}) => {
indexes_to_drop.push((*compute_instance, *id));
}
CatalogItem::Secret(_) => {
secrets_to_drop.push(*id);
}
_ => (),
}
}
Expand Down Expand Up @@ -4438,6 +4464,9 @@ impl Coordinator {
if !indexes_to_drop.is_empty() {
self.drop_indexes(indexes_to_drop).await;
}
if !secrets_to_drop.is_empty() {
self.drop_secrets(secrets_to_drop).await;
}

// We don't want to block the coordinator on an external postgres server, so
// move the drop slots to a separate task. This does mean that a failed drop
Expand Down Expand Up @@ -4613,6 +4642,20 @@ impl Coordinator {
Ok(())
}

async fn drop_secrets(&mut self, secrets: Vec<GlobalId>) {
let ops = secrets
.into_iter()
.map(|id| SecretOp::Delete { id })
.collect_vec();

match self.secrets_controller.apply(ops) {
Ok(_) => {}
Err(e) => {
warn!("Dropping secrets has encountered an error: {}", e);
}
}
}

/// Finalizes a dataflow and then broadcasts it to all workers.
/// Utility method for the more general [Self::ship_dataflows]
async fn ship_dataflow(&mut self, dataflow: DataflowDesc, instance: ComputeInstanceId) {
Expand Down Expand Up @@ -4839,6 +4882,7 @@ pub async fn serve(
metrics_registry,
persister,
now,
secrets_controller,
}: Config,
) -> Result<(Handle, Client), CoordError> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -4885,6 +4929,7 @@ pub async fn serve(
// for bootstrap completion before proceeding.
let (bootstrap_tx, bootstrap_rx) = oneshot::channel();
let handle = TokioHandle::current();

let thread = thread::Builder::new()
.name("coordinator".to_string())
.spawn(move || {
Expand All @@ -4908,6 +4953,7 @@ pub async fn serve(
pending_tails: HashMap::new(),
write_lock: Arc::new(tokio::sync::Mutex::new(())),
write_lock_wait_group: VecDeque::new(),
secrets_controller,
};
let bootstrap = handle.block_on(coord.bootstrap(builtin_table_updates));
let ok = bootstrap.is_ok();
Expand Down
3 changes: 3 additions & 0 deletions src/materialized/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ mz-pgwire = { path = "../pgwire" }
mz-pid-file = { path = "../pid-file" }
mz-prof = { path = "../prof" }
mz-repr = { path = "../repr" }
mz-secrets = { path = "../secrets" }
mz-secrets-filesystem = { path = "../secrets-filesystem" }
mz-secrets-kubernetes = { path = "../secrets-kubernetes" }
mz-sql = { path = "../sql" }
nix = "0.23.1"
num_cpus = "1.13.1"
Expand Down
28 changes: 26 additions & 2 deletions src/materialized/src/bin/materialized/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ use lazy_static::lazy_static;
use sysinfo::{ProcessorExt, SystemExt};
use uuid::Uuid;

use materialized::{OrchestratorConfig, RemoteStorageConfig, StorageConfig, TlsConfig, TlsMode};
use materialized::{
OrchestratorConfig, RemoteStorageConfig, SecretsControllerConfig, StorageConfig, TlsConfig,
TlsMode,
};
use mz_coord::{PersistConfig, PersistFileStorage, PersistStorage};
use mz_dataflow_types::sources::AwsExternalId;
use mz_frontegg_auth::{FronteggAuthentication, FronteggConfig};
Expand Down Expand Up @@ -193,6 +196,11 @@ pub struct Args {
#[structopt(long, hide = true, required_if_eq("orchestrator", "kubernetes"))]
dataflowd_image: Option<String>,

// === Secrets Controller options. ===
/// The secrets controller implementation to use
#[structopt(long, hide = true, arg_enum)]
secrets_controller: Option<SecretsController>,

// === Timely worker configuration. ===
/// Number of dataflow worker threads.
#[clap(short, long, env = "MZ_WORKERS", value_name = "N", default_value_t)]
Expand Down Expand Up @@ -460,6 +468,12 @@ enum Orchestrator {
Kubernetes,
}

#[derive(ArgEnum, Debug, Clone)]
enum SecretsController {
LocalFileSystem,
Kubernetes,
}

#[derive(Debug)]
struct OrchestratorLabel {
key: String,
Expand Down Expand Up @@ -654,7 +668,7 @@ fn run(args: Args) -> Result<(), anyhow::Error> {
}
Some(Orchestrator::Kubernetes) => Some(OrchestratorConfig::Kubernetes {
config: KubernetesOrchestratorConfig {
context: args.kubernetes_context,
context: args.kubernetes_context.clone(),
service_labels: args
.orchestrator_service_label
.into_iter()
Expand All @@ -665,6 +679,15 @@ fn run(args: Args) -> Result<(), anyhow::Error> {
}),
};

// Configure secrets controller.
let secrets_controller = match args.secrets_controller {
None => None,
Some(SecretsController::LocalFileSystem) => Some(SecretsControllerConfig::LocalFileSystem),
Some(SecretsController::Kubernetes) => Some(SecretsControllerConfig::Kubernetes {
context: args.kubernetes_context,
}),
};

// Configure storage.
let data_directory = args.data_directory;
fs::create_dir_all(&data_directory)
Expand Down Expand Up @@ -871,6 +894,7 @@ max log level: {max_log_level}",
cors_allowed_origins: args.cors_allowed_origin,
data_directory,
orchestrator,
secrets_controller,
storage,
experimental_mode: args.experimental,
disable_user_indexes: args.disable_user_indexes,
Expand Down
Loading

0 comments on commit 6dda6aa

Please sign in to comment.