Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Create Rust iml-state-machine #2024

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 175 additions & 122 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ members = [
'iml-services/iml-snapshot',
'iml-services/iml-stats',
'iml-sfa',
'iml-state-machine',
'iml-system-test-utils',
'iml-systemd',
'iml-task-runner',
Expand Down
11 changes: 10 additions & 1 deletion chroma-manager.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,16 @@ server {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_pass {{WARP_DRIVE_PROXY_PASS}};
proxy_pass {{WARP_DRIVE_PROXY_PASS}}/messaging;
}

location /state_machine {
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Server $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_pass {{WARP_DRIVE_PROXY_PASS}}/state_machine;
}

location /mailbox {
Expand Down
6 changes: 5 additions & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ services:
- "manager-config:/var/lib/chroma"
environment:
- PROXY_HOST=iml-warp-drive
- ACTION_RUNNER_HOST=iml-action-runner
- ACTION_RUNNER_PORT=8009
- RUST_LOG=info,sqlx::query=warn
iml-action-runner:
image: "imlteam/iml-action-runner:6.2.0"
Expand All @@ -348,6 +350,7 @@ services:
environment:
- RUST_LOG=info,sqlx::query=warn
- PROXY_HOST=iml-action-runner
- ACTION_RUNNER_HOST=iml-action-runner
- ACTION_RUNNER_PORT=8009
iml-api:
image: "imlteam/iml-api:6.2.0"
Expand All @@ -360,7 +363,8 @@ services:
- "manager-config:/var/lib/chroma"
- "report:/var/spool/iml/report"
environment:
- PROXY_HOST=iml-api
- PROXY_HOST=nginx
- SERVICE_HOST=iml-api
- RUST_LOG=info,sqlx::query=warn
- BRANDING
- USE_STRATAGEM
Expand Down
4 changes: 2 additions & 2 deletions iml-action-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use bytes::buf::BufExt as _;
use hyper::{client::HttpConnector, Body, Request};
use hyperlocal::{UnixClientExt as _, UnixConnector};
use iml_manager_env::{get_action_runner_http, get_action_runner_uds, running_in_docker};
use iml_manager_env::{get_action_runner_uds, running_in_docker, ACTION_RUNNER_URL};
use iml_wire_types::{Action, ActionId, ActionName, ActionType, AgentResult, Fqdn};
use std::{ops::Deref, sync::Arc};
use thiserror::Error;
Expand Down Expand Up @@ -57,7 +57,7 @@ impl Default for Client {
let (inner, uri) = if running_in_docker() {
(
ClientInner::Http(hyper::Client::new()),
get_action_runner_http().parse::<hyper::Uri>().unwrap(),
ACTION_RUNNER_URL.as_str().parse::<hyper::Uri>().unwrap(),
)
} else {
(
Expand Down
21 changes: 18 additions & 3 deletions iml-api/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

mod state_machine;
mod stratagem;
mod task;

Expand All @@ -12,6 +13,7 @@ use crate::{
};
use chrono::{DateTime, Utc};
use futures::{future::join_all, TryFutureExt, TryStreamExt};
use iml_manager_client::Client;
use iml_postgres::{
active_mgs_host_fqdn, fqdn_by_host_id, sqlx, sqlx::postgres::types::PgInterval, PgPool,
};
Expand Down Expand Up @@ -115,6 +117,9 @@ impl QueryRoot {
fn task(&self) -> task::TaskQuery {
task::TaskQuery
}
fn state_machine(&self) -> state_machine::StateMachineQuery {
state_machine::StateMachineQuery
}
#[graphql(arguments(
limit(description = "optional paging limit, defaults to all rows"),
offset(description = "Offset into items, defaults to 0"),
Expand Down Expand Up @@ -620,6 +625,9 @@ impl MutationRoot {
fn task(&self) -> task::TaskMutation {
task::TaskMutation
}
fn state_machine(&self) -> state_machine::StateMachineMutation {
state_machine::StateMachineMutation
}
#[graphql(arguments(
fsname(description = "Filesystem to snapshot"),
name(description = "Name of the snapshot"),
Expand Down Expand Up @@ -870,8 +878,14 @@ impl MutationRoot {
.map(|x| x.id);

if let Some(id) = maybe_id {
configure_snapshot_timer(id, fsname, interval.0, use_barrier.unwrap_or_default())
.await?;
configure_snapshot_timer(
context.http_client.clone(),
id,
fsname,
interval.0,
use_barrier.unwrap_or_default(),
)
.await?;
}

Ok(true)
Expand All @@ -884,7 +898,7 @@ impl MutationRoot {
.execute(&context.pg_pool)
.await?;

remove_snapshot_timer(id).await?;
remove_snapshot_timer(context.http_client.clone(), id).await?;

Ok(true)
}
Expand Down Expand Up @@ -1117,6 +1131,7 @@ pub(crate) type Schema = RootNode<'static, QueryRoot, MutationRoot, EmptySubscri
pub(crate) struct Context {
pub(crate) pg_pool: PgPool,
pub(crate) rabbit_pool: Pool,
pub(crate) http_client: Client,
}

impl juniper::Context for Context {}
Expand Down
150 changes: 150 additions & 0 deletions iml-api/src/graphql/state_machine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright (c) 2020 DDN. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use crate::graphql::Context;
use iml_manager_client::{post, Client, ImlManagerClientError};
use iml_manager_env::get_proxy_url;
use iml_postgres::sqlx;
use iml_wire_types::{
snapshot::{Destroy, Mount, Unmount},
state_machine::{Command, CommandRecord, Job, Transition},
warp_drive::{GraphqlRecordId, RecordId},
};

pub(crate) struct StateMachineMutation;

#[juniper::graphql_object(Context = Context)]
impl StateMachineMutation {
/// Run a state_machine `Transition` for a given record
async fn run_transition(
context: &Context,
record_id: GraphqlRecordId,
transition: Transition,
) -> juniper::FieldResult<CommandRecord> {
let record_id = RecordId::from(record_id);

let xs = get_transition_path(context.http_client.clone(), record_id, transition).await?;

let mut jobs = vec![];

for x in xs {
match (record_id, x) {
(RecordId::Snapshot(x), Transition::MountSnapshot) => {
let x = sqlx::query!(
"SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1",
x
)
.fetch_one(&context.pg_pool)
.await?;

jobs.push(Job::MountSnapshotJob(Mount {
fsname: x.filesystem_name,
name: x.snapshot_name,
}));
}
(RecordId::Snapshot(x), Transition::UnmountSnapshot) => {
let x = sqlx::query!(
"SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1",
x
)
.fetch_one(&context.pg_pool)
.await?;

jobs.push(Job::UnmountSnapshotJob(Unmount {
fsname: x.filesystem_name,
name: x.snapshot_name,
}))
}
(RecordId::Snapshot(x), Transition::RemoveSnapshot) => {
let x = sqlx::query!(
"SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1",
x
)
.fetch_one(&context.pg_pool)
.await?;

jobs.push(Job::RemoveSnapshotJob(Destroy {
fsname: x.filesystem_name,
name: x.snapshot_name,
force: true,
}))
}
_ => {}
}
}

let cmd = Command {
message: "Running Transition".to_string(),
jobs,
};

let mut url = get_proxy_url();

url.set_path("state_machine/run_command/");

let cmd = post(context.http_client.clone(), url.as_str(), cmd)
.await?
.error_for_status()?
.json()
.await?;

Ok(cmd)
}
}

pub(crate) struct StateMachineQuery;

#[juniper::graphql_object(Context = Context)]
impl StateMachineQuery {
/// Given a record, figure out the possible transitions available for it
async fn get_transitions(
context: &Context,
record_id: GraphqlRecordId,
) -> juniper::FieldResult<Vec<Transition>> {
let mut url = get_proxy_url();

url.set_path("state_machine/get_transitions/");

let xs = post(
context.http_client.clone(),
url.as_str(),
RecordId::from(record_id),
)
.await?
.error_for_status()?
.json()
.await?;

Ok(xs)
}
/// Given a record and transition, figure out the shortest possible path for that
/// Record to reach that transition.
async fn get_transition_path(
context: &Context,
record_id: GraphqlRecordId,
transition: Transition,
) -> juniper::FieldResult<Vec<Transition>> {
let xs = get_transition_path(context.http_client.clone(), record_id, transition).await?;

Ok(xs)
}
}

async fn get_transition_path(
client: Client,
record_id: impl Into<RecordId>,
transition: Transition,
) -> Result<Vec<Transition>, ImlManagerClientError> {
let mut url = get_proxy_url();

url.set_path("state_machine/get_transition_path/");

let xs = post(client, url.as_str(), (record_id.into(), transition))
.await?
.error_for_status()?
.json()
.await?;

Ok(xs)
}
6 changes: 5 additions & 1 deletion iml-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod error;
mod graphql;
mod timer;

use iml_manager_client::get_client;
use iml_manager_env::get_pool_limit;
use iml_postgres::get_db_pool;
use iml_rabbit::{self, create_connection_filter};
Expand All @@ -22,7 +23,7 @@ const DEFAULT_POOL_LIMIT: u32 = 5;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
iml_tracing::init();

let addr = iml_manager_env::get_iml_api_addr();
let addr = iml_manager_env::get_iml_api_bind_addr();

let conf = Conf {
allow_anonymous_read: iml_manager_env::get_allow_anonymous_read(),
Expand All @@ -42,6 +43,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let pg_pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?;

let http_client = get_client()?;

let schema = Arc::new(graphql::Schema::new(
graphql::QueryRoot,
graphql::MutationRoot,
Expand All @@ -52,6 +55,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = Arc::new(graphql::Context {
pg_pool,
rabbit_pool,
http_client,
});
let ctx_filter = warp::any().map(move || Arc::clone(&ctx));

Expand Down
9 changes: 3 additions & 6 deletions iml-api/src/timer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::ImlApiError;
use iml_manager_client::{delete, get_client, put};
use iml_manager_client::{delete, put, Client};
use iml_manager_env::{get_timer_addr, running_in_docker};
use std::time::Duration;

Expand All @@ -12,6 +12,7 @@ pub struct TimerConfig {
}

pub async fn configure_snapshot_timer(
client: Client,
config_id: i32,
fsname: String,
interval: Duration,
Expand Down Expand Up @@ -73,8 +74,6 @@ ExecStart={}
service_config,
};

let client = get_client()?;

let url = format!("http://{}/configure/", get_timer_addr());
tracing::debug!(
"Sending snapshot interval config to timer service: {:?} {:?}",
Expand All @@ -86,9 +85,7 @@ ExecStart={}
Ok(())
}

pub async fn remove_snapshot_timer(config_id: i32) -> Result<(), ImlApiError> {
let client = get_client()?;

pub async fn remove_snapshot_timer(client: Client, config_id: i32) -> Result<(), ImlApiError> {
delete(
client,
format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,16 @@ server {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_pass http://127.0.0.1:8890;
proxy_pass http://127.0.0.1:8890/messaging;
}

location /state_machine {
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Server $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_pass http://127.0.0.1:8890/state_machine;
}

location /mailbox {
Expand Down
Loading