From d8c619bacd787ec8b145198ccfaabaaccd922e68 Mon Sep 17 00:00:00 2001 From: VG Date: Wed, 20 Sep 2023 23:42:29 +0800 Subject: [PATCH] chore: refactor into cloud client crate --- Cargo.lock | 34 +- dozer-api/src/rest/api_generator.rs | 3 +- dozer-api/src/rest/mod.rs | 3 +- dozer-api/src/rest/tests/routes.rs | 2 +- dozer-cache/src/lib.rs | 2 +- dozer-cache/src/reader.rs | 8 +- dozer-cli/Cargo.toml | 1 + dozer-cli/src/cli/cloud.rs | 188 ------ dozer-cli/src/cli/mod.rs | 2 - dozer-cli/src/cli/types.rs | 2 +- dozer-cli/src/cloud_app_context.rs | 62 -- dozer-cli/src/cloud_helper.rs | 24 - dozer-cli/src/errors.rs | 126 +--- dozer-cli/src/lib.rs | 47 +- dozer-cli/src/main.rs | 54 +- dozer-cli/src/simple/cloud/deployer.rs | 106 --- dozer-cli/src/simple/cloud/login.rs | 210 ------ dozer-cli/src/simple/cloud/mod.rs | 5 - dozer-cli/src/simple/cloud/monitor.rs | 74 --- .../src/simple/cloud/progress_printer.rs | 36 -- dozer-cli/src/simple/cloud/version.rs | 160 ----- dozer-cli/src/simple/cloud_orchestrator.rs | 606 ------------------ dozer-cli/src/simple/mod.rs | 7 +- dozer-cli/src/simple/token_layer.rs | 49 -- dozer-types/build.rs | 65 -- dozer-types/src/grpc_types.rs | 16 - dozer-types/src/lib.rs | 13 + 27 files changed, 81 insertions(+), 1824 deletions(-) delete mode 100644 dozer-cli/src/cli/cloud.rs delete mode 100644 dozer-cli/src/cloud_app_context.rs delete mode 100644 dozer-cli/src/cloud_helper.rs delete mode 100644 dozer-cli/src/simple/cloud/deployer.rs delete mode 100644 dozer-cli/src/simple/cloud/login.rs delete mode 100644 dozer-cli/src/simple/cloud/mod.rs delete mode 100644 dozer-cli/src/simple/cloud/monitor.rs delete mode 100644 dozer-cli/src/simple/cloud/progress_printer.rs delete mode 100644 dozer-cli/src/simple/cloud/version.rs delete mode 100644 dozer-cli/src/simple/cloud_orchestrator.rs delete mode 100644 dozer-cli/src/simple/token_layer.rs diff --git a/Cargo.lock b/Cargo.lock index bcff98cebf..e0518b3169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1548,9 +1548,9 @@ dependencies = [ [[package]] name = "camino" -version = "1.1.4" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c530edf18f37068ac2d977409ed5cd50d53d73bc653c7647b48eb78976ac9ae2" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" [[package]] name = "cast" @@ -2640,6 +2640,7 @@ dependencies = [ "ctrlc", "dozer-api", "dozer-cache", + "dozer-cloud-client", "dozer-core", "dozer-ingestion", "dozer-sql", @@ -2669,6 +2670,27 @@ dependencies = [ "zip 0.5.13", ] +[[package]] +name = "dozer-cloud-client" +version = "0.1.0" +dependencies = [ + "camino", + "chrono", + "clap 4.4.1", + "dozer-types", + "futures", + "futures-util", + "glob", + "prost 0.12.0", + "prost-types 0.12.0", + "reqwest", + "rustyline", + "tokio", + "tonic 0.10.0", + "tonic-build", + "tower", +] + [[package]] name = "dozer-core" version = "0.1.39" @@ -3351,9 +3373,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e2792b0ff0340399d58445b88fd9770e3489eff258a4cbc1523418f12abf84" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" dependencies = [ "futures-channel", "futures-core", @@ -3382,9 +3404,9 @@ checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" [[package]] name = "futures-executor" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" dependencies = [ "futures-core", "futures-task", diff --git a/dozer-api/src/rest/api_generator.rs b/dozer-api/src/rest/api_generator.rs index 8852be0a6e..5ae6c20527 100644 --- a/dozer-api/src/rest/api_generator.rs +++ b/dozer-api/src/rest/api_generator.rs @@ -4,7 +4,8 @@ use actix_web::web::ReqData; use actix_web::{web, HttpResponse}; use dozer_cache::cache::expression::{QueryExpression, Skip}; use dozer_cache::cache::CacheRecord; -use dozer_cache::{CacheReader, Phase}; +use dozer_cache::CacheReader; +use dozer_types::api::Phase; use dozer_types::errors::types::CannotConvertF64ToJson; use dozer_types::indexmap::IndexMap; use dozer_types::models::api_endpoint::ApiEndpoint; diff --git a/dozer-api/src/rest/mod.rs b/dozer-api/src/rest/mod.rs index 0c63f0948e..df598417e0 100644 --- a/dozer-api/src/rest/mod.rs +++ b/dozer-api/src/rest/mod.rs @@ -20,6 +20,7 @@ use actix_web::{ }; use actix_web_httpauth::middleware::HttpAuthentication; use dozer_tracing::LabelsAndProgress; +use dozer_types::api::DOZER_SERVER_NAME_HEADER; use dozer_types::{log::info, models::api_config::RestApiOptions}; use dozer_types::{ models::api_security::ApiSecurity, @@ -38,8 +39,6 @@ enum CorsOptions { Custom(Vec, usize), } -pub const DOZER_SERVER_NAME_HEADER: &str = "x-dozer-server-name"; - #[derive(Clone)] pub struct ApiServer { shutdown_timeout: u64, diff --git a/dozer-api/src/rest/tests/routes.rs b/dozer-api/src/rest/tests/routes.rs index d244b21f6e..0a5727ee64 100644 --- a/dozer-api/src/rest/tests/routes.rs +++ b/dozer-api/src/rest/tests/routes.rs @@ -5,7 +5,7 @@ use crate::{generator::oapi::generator::OpenApiGenerator, test_utils, CacheEndpo use actix_http::{body::MessageBody, Request}; use actix_web::dev::{Service, ServiceResponse}; use actix_web::http::header::ContentType; -use dozer_cache::Phase; +use dozer_types::api::Phase; use dozer_types::models::api_endpoint::ApiEndpoint; use dozer_types::serde_json::{json, Value}; use http::StatusCode; diff --git a/dozer-cache/src/lib.rs b/dozer-cache/src/lib.rs index 1ca841385c..24125bda85 100644 --- a/dozer-cache/src/lib.rs +++ b/dozer-cache/src/lib.rs @@ -3,4 +3,4 @@ pub mod errors; mod reader; pub use dozer_log; -pub use reader::{AccessFilter, CacheReader, Phase}; +pub use reader::{AccessFilter, CacheReader}; diff --git a/dozer-cache/src/reader.rs b/dozer-cache/src/reader.rs index 5095a703c4..2dabf9582f 100644 --- a/dozer-cache/src/reader.rs +++ b/dozer-cache/src/reader.rs @@ -3,6 +3,7 @@ use crate::cache::{expression::QueryExpression, CacheRecord, RoCache}; use super::cache::expression::FilterExpression; use crate::errors::CacheError; use dozer_types::{ + api::Phase, serde, types::{Record, SchemaWithIndex}, }; @@ -21,13 +22,6 @@ pub struct AccessFilter { pub fields: Vec, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[serde(crate = "dozer_types::serde")] -pub enum Phase { - Snapshotting, - Streaming, -} - #[derive(Debug)] /// CacheReader dynamically attaches permissions on top of queries pub struct CacheReader { diff --git a/dozer-cli/Cargo.toml b/dozer-cli/Cargo.toml index f57a297196..84aebc9bab 100644 --- a/dozer-cli/Cargo.toml +++ b/dozer-cli/Cargo.toml @@ -10,6 +10,7 @@ authors = ["getdozer/dozer-dev"] name = "dozer" [dependencies] +dozer-cloud-client = { path = "../../dozer-cloud-client" } dozer-api = { path = "../dozer-api" } dozer-ingestion = { path = "../dozer-ingestion" } dozer-core = { path = "../dozer-core" } diff --git a/dozer-cli/src/cli/cloud.rs b/dozer-cli/src/cli/cloud.rs deleted file mode 100644 index c22788ce9d..0000000000 --- a/dozer-cli/src/cli/cloud.rs +++ /dev/null @@ -1,188 +0,0 @@ -use clap::{Args, Subcommand}; - -use clap::ArgAction; -use dozer_types::grpc_types::cloud::Secret; -use std::error::Error; - -#[derive(Debug, Args)] -#[command(args_conflicts_with_subcommands = true)] -pub struct Cloud { - #[arg(global = true, short = 't', long)] - pub target_url: Option, - - #[arg(global = true, short, long)] - pub app_id: Option, - - #[arg(global = true, short, long)] - pub profile: Option, - - #[command(subcommand)] - pub command: CloudCommands, -} - -#[derive(Debug, Subcommand, Clone)] -pub enum CloudCommands { - /// Login to Dozer Cloud service - Login { - #[arg(long = "organisation_slug")] - organisation_slug: Option, - - #[arg(global = true, long = "profile_name")] - profile_name: Option, - - #[arg(global = true, long = "client_id")] - client_id: Option, - - #[arg(global = true, long = "client_secret")] - client_secret: Option, - }, - /// Deploy application to Dozer Cloud - Deploy(DeployCommandArgs), - /// Stop and delete application from Dozer Cloud - Delete, - /// Get status of running application in Dozer Cloud - Status, - /// Monitor processed data amount in Dozer Cloud - Monitor, - /// Inspect application logs - Logs(LogCommandArgs), - /// Dozer application version management - #[command(subcommand)] - Version(VersionCommand), - /// Set application, which will be used for all commands - SetApp { - /// App id of application which will be used for all commands - app_id: String, - }, - /// List all dozer application in Dozer Cloud - List(ListCommandArgs), - /// Dozer app secrets management - #[command(subcommand)] - Secrets(SecretsCommand), -} - -#[derive(Debug, Args, Clone)] -pub struct DeployCommandArgs { - /// List of secrets which will be used in deployment - #[arg(short, long, value_parser = parse_key_val)] - pub secrets: Vec, - - #[arg(long = "no-lock", action = ArgAction::SetFalse)] - pub locked: bool, - - #[arg(long = "allow-incompatible")] - pub allow_incompatible: bool, - - #[arg(long = "follow")] - pub follow: bool, -} - -pub fn default_num_api_instances() -> i32 { - 2 -} - -#[derive(Debug, Args, Clone)] -pub struct LogCommandArgs { - /// Whether to follow the logs - #[arg(short, long)] - pub follow: bool, - - /// The deployment to inspect - #[arg(short, long)] - pub deployment: Option, - - /// Ignore app logs - #[arg(long, default_value = "false", action=ArgAction::SetTrue)] - pub ignore_app: bool, - - /// Ignore api logs - #[arg(long, default_value = "false", action=ArgAction::SetTrue)] - pub ignore_api: bool, - - /// Ignore build logs - #[arg(long, default_value = "false", action=ArgAction::SetTrue)] - pub ignore_build: bool, -} - -#[derive(Debug, Args, Clone)] -pub struct ListCommandArgs { - /// Offset of the list - #[arg(short = 'o', long)] - pub offset: Option, - - /// Limit of the list - #[arg(short = 'l', long)] - pub limit: Option, - - /// Filter of application name - #[arg(short = 'n', long)] - pub name: Option, - - /// Filter of application uuid - #[arg(short = 'u', long)] - pub uuid: Option, -} - -#[derive(Debug, Clone, Subcommand)] -pub enum VersionCommand { - /// Inspects the status of a version, compared to the current version if it's not current. - Status { - /// The version to inspect - version: u32, - }, - /// Creates a new version of the application with the given deployment - Create { - /// The deployment of the application to create a new version from - deployment: u32, - }, - /// Sets a version as the "current" version of the application - /// - /// Current version of an application can be visited without the "/v" prefix. - SetCurrent { - /// The version to set as current - version: u32, - }, -} - -#[derive(Debug, Clone, Subcommand)] -pub enum SecretsCommand { - /// Creates new secret - Create { - /// Name of secret - name: String, - - /// Value of secret - value: String, - }, - /// Update secret value - Update { - /// Name of secret - name: String, - - /// Value of secret - value: String, - }, - /// Delete secret - Delete { - /// Name of secret - name: String, - }, - /// Get secret - Get { - /// Name of secret - name: String, - }, - /// List all app secrets - List {}, -} - -fn parse_key_val(s: &str) -> Result> { - let pos = s - .find('=') - .ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?; - - Ok(Secret { - name: s[..pos].parse()?, - value: s[pos + 1..].parse()?, - }) -} diff --git a/dozer-cli/src/cli/mod.rs b/dozer-cli/src/cli/mod.rs index eb1de27547..a786f4957c 100644 --- a/dozer-cli/src/cli/mod.rs +++ b/dozer-cli/src/cli/mod.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "cloud")] -pub mod cloud; mod helper; mod init; pub mod types; diff --git a/dozer-cli/src/cli/types.rs b/dozer-cli/src/cli/types.rs index 6e9922bc60..cbfb350bf7 100644 --- a/dozer-cli/src/cli/types.rs +++ b/dozer-cli/src/cli/types.rs @@ -3,7 +3,7 @@ use clap::{Args, Parser, Subcommand}; use super::helper::{DESCRIPTION, LOGO}; #[cfg(feature = "cloud")] -use crate::cli::cloud::Cloud; +use dozer_cloud_client::cli::cloud::Cloud; use dozer_types::{ constants::{DEFAULT_CONFIG_PATH_PATTERNS, LOCK_FILE}, serde_json, diff --git a/dozer-cli/src/cloud_app_context.rs b/dozer-cli/src/cloud_app_context.rs deleted file mode 100644 index c9dab5dc1a..0000000000 --- a/dozer-cli/src/cloud_app_context.rs +++ /dev/null @@ -1,62 +0,0 @@ -use crate::errors::CloudContextError; -use crate::errors::CloudContextError::{AppIdNotFound, FailedToGetDirectoryPath}; -use dozer_types::models::cloud::Cloud; -use dozer_types::serde::Serialize; -use dozer_types::serde_yaml; -use std::io::Write; -use std::{env, fs}; - -#[derive(Serialize)] -#[serde(crate = "dozer_types::serde")] -pub struct CloudConfig { - pub cloud: Cloud, -} - -pub struct CloudAppContext {} - -impl CloudAppContext { - fn get_file_path() -> Result { - Ok(format!( - "{}/{}", - env::current_dir()? - .into_os_string() - .into_string() - .map_err(|_| FailedToGetDirectoryPath)?, - "dozer-config.cloud.yaml" - )) - } - - pub fn delete_config_file() -> Result<(), CloudContextError> { - let file_path = Self::get_file_path()?; - fs::remove_file(file_path)?; - Ok(()) - } - - pub fn get_app_id(config: Option<&Cloud>) -> Result { - match &config { - None => Err(AppIdNotFound), - Some(cloud_config) => cloud_config.app_id.clone().ok_or(AppIdNotFound), - } - } - - pub fn save_app_id(app_id: String) -> Result<(), CloudContextError> { - let file_path = Self::get_file_path()?; - let mut f = fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(file_path)?; - - let config = CloudConfig { - cloud: Cloud { - app_id: Some(app_id), - ..Default::default() - }, - }; - - let config_string = serde_yaml::to_string(&config).unwrap(); - f.write_all(config_string.as_bytes())?; - - Ok(()) - } -} diff --git a/dozer-cli/src/cloud_helper.rs b/dozer-cli/src/cloud_helper.rs deleted file mode 100644 index 4785bab02f..0000000000 --- a/dozer-cli/src/cloud_helper.rs +++ /dev/null @@ -1,24 +0,0 @@ -use crate::errors::ConfigCombineError::{ - CannotReadConfig, CannotReadFile, WrongPatternOfConfigFilesGlob, -}; - -use dozer_types::grpc_types::cloud::File; -use glob::glob; -use std::fs; - -pub fn list_files(config_paths: Vec) -> Result, crate::errors::CloudError> { - let mut files = vec![]; - for pattern in config_paths { - let files_glob = glob(&pattern).map_err(WrongPatternOfConfigFilesGlob)?; - - for entry in files_glob { - let path = entry.map_err(CannotReadFile)?; - files.push(File { - name: path.clone().to_str().unwrap().to_string(), - content: fs::read_to_string(path.clone()).map_err(|e| CannotReadConfig(path, e))?, - }); - } - } - - Ok(files) -} diff --git a/dozer-cli/src/errors.rs b/dozer-cli/src/errors.rs index 740a640f7a..0c0990b63b 100644 --- a/dozer-cli/src/errors.rs +++ b/dozer-cli/src/errors.rs @@ -1,37 +1,21 @@ #![allow(clippy::enum_variant_names)] -use glob::{GlobError, PatternError}; -use std::io; -use std::path::PathBuf; -use tonic::Code::NotFound; - -use crate::{ - errors::CloudError::{ApplicationNotFound, CloudServiceError}, - live::LiveError, -}; -use dozer_api::{ - errors::{ApiInitError, AuthError, GenerationError, GrpcError}, - rest::DOZER_SERVER_NAME_HEADER, -}; +use crate::live::LiveError; +use dozer_api::errors::{ApiInitError, AuthError, GenerationError, GrpcError}; use dozer_cache::dozer_log::storage; use dozer_cache::errors::CacheError; +use dozer_cloud_client::errors::CloudError; use dozer_core::errors::ExecutionError; use dozer_ingestion::errors::ConnectorError; use dozer_sql::errors::PipelineError; use dozer_types::{constants::LOCK_FILE, thiserror::Error}; use dozer_types::{errors::internal::BoxedError, serde_json}; use dozer_types::{serde_yaml, thiserror}; +use glob::{GlobError, PatternError}; +use std::path::PathBuf; use crate::pipeline::connector_source::ConnectorSourceFactoryError; -pub fn map_tonic_error(e: tonic::Status) -> CloudError { - if e.code() == NotFound && e.message() == "Failed to find app" { - ApplicationNotFound - } else { - CloudServiceError(e) - } -} - #[derive(Error, Debug)] pub enum OrchestrationError { #[error("Failed to write config yaml: {0:?}")] @@ -42,18 +26,12 @@ pub enum OrchestrationError { NoBuildFound, #[error("Failed to create log: {0}")] CreateLog(#[from] dozer_cache::dozer_log::replication::Error), - #[error("Failed to login: {0}")] - CloudLoginFailed(#[from] CloudLoginError), - #[error("Credential Error: {0}")] - CredentialError(#[from] CloudCredentialError), #[error("Failed to build: {0}")] BuildFailed(#[from] BuildError), #[error("Failed to generate token: {0}")] GenerateTokenFailed(#[source] AuthError), #[error("Missing api config or security input")] MissingSecurityConfig, - #[error("Cloud service error: {0}")] - CloudError(#[from] CloudError), #[error("Failed to initialize api server: {0}")] ApiInitFailed(#[from] ApiInitError), #[error("Failed to server REST API: {0}")] @@ -80,6 +58,8 @@ pub enum OrchestrationError { PipelineError(#[from] PipelineError), #[error(transparent)] CliError(#[from] CliError), + #[error(transparent)] + CloudError(#[from] CloudError), #[error("table_name: {0:?} not found in any of the connections")] SourceValidationError(String), #[error("connection: {0:?} not found")] @@ -93,10 +73,6 @@ pub enum OrchestrationError { #[error("No endpoints initialized in the config provided")] EmptyEndpoints, #[error(transparent)] - CloudContextError(#[from] CloudContextError), - #[error("Failed to read organisation name. Error: {0}")] - FailedToReadOrganisationName(#[source] io::Error), - #[error(transparent)] LiveError(#[from] LiveError), #[error("{LOCK_FILE} is out of date")] LockedOutdatedLockfile, @@ -137,39 +113,6 @@ pub enum CliError { Io(#[from] std::io::Error), } -#[derive(Error, Debug)] -pub enum CloudError { - #[error("Connection failed. Error: {0:?}")] - ConnectionToCloudServiceError(#[from] tonic::transport::Error), - - #[error("Cloud service returned error: {0:?}")] - CloudServiceError(#[from] tonic::Status), - - #[error("GRPC request failed, error: {} (GRPC status {})", .0.message(), .0.code())] - GRPCCallError(#[source] tonic::Status), - - #[error(transparent)] - CloudCredentialError(#[from] CloudCredentialError), - - #[error("Reqwest error: {0}")] - Reqwest(#[from] reqwest::Error), - - #[error("Response header {DOZER_SERVER_NAME_HEADER} is missing")] - MissingResponseHeader, - - #[error(transparent)] - CloudContextError(#[from] CloudContextError), - - #[error(transparent)] - ConfigCombineError(#[from] ConfigCombineError), - - #[error("Application not found")] - ApplicationNotFound, - - #[error("{LOCK_FILE} not found. Run `dozer build` before deploying, or pass '--no-lock'.")] - LockfileNotFound, -} - #[derive(Debug, Error)] pub enum ConfigCombineError { #[error("Failed to parse yaml file {0}: {1}")] @@ -227,58 +170,3 @@ pub enum BuildError { #[error("Storage error: {0}")] Storage(#[from] storage::Error), } - -#[derive(Debug, Error)] -pub enum CloudLoginError { - #[error("Tonic error: {0}")] - TonicError(#[from] tonic::Status), - - #[error("Transport error: {0}")] - Transport(#[from] tonic::transport::Error), - - #[error("HttpRequest error: {0}")] - HttpRequestError(#[from] reqwest::Error), - - #[error(transparent)] - SerializationError(#[from] dozer_types::serde_json::Error), - - #[error("Failed to read input: {0}")] - InputError(#[from] std::io::Error), - - #[error(transparent)] - CloudCredentialError(#[from] CloudCredentialError), - - #[error("Organisation not found")] - OrganisationNotFound, -} - -#[derive(Debug, Error)] -pub enum CloudCredentialError { - #[error(transparent)] - SerializationError(#[from] dozer_types::serde_yaml::Error), - - #[error(transparent)] - JsonSerializationError(#[from] dozer_types::serde_json::Error), - #[error("Failed to create home directory: {0}")] - FailedToCreateDirectory(#[from] std::io::Error), - - #[error("HttpRequest error: {0}")] - HttpRequestError(#[from] reqwest::Error), - - #[error("Missing credentials.yaml file - Please try to login again")] - MissingCredentialFile, - #[error("There's no profile with given name - Please try to login again")] - MissingProfile, -} - -#[derive(Debug, Error)] -pub enum CloudContextError { - #[error("Failed to create access directory: {0}")] - FailedToAccessDirectory(#[from] std::io::Error), - - #[error("Failed to get current directory path")] - FailedToGetDirectoryPath, - - #[error("App id not found in configuration. You need to run \"deploy\" or \"set-app\" first")] - AppIdNotFound, -} diff --git a/dozer-cli/src/lib.rs b/dozer-cli/src/lib.rs index 8ed645d374..8cdcdf2196 100644 --- a/dozer-cli/src/lib.rs +++ b/dozer-cli/src/lib.rs @@ -15,44 +15,12 @@ use std::{ thread::current, }; use tokio::task::JoinHandle; -#[cfg(feature = "cloud")] -pub mod cloud_app_context; -#[cfg(feature = "cloud")] -mod cloud_helper; pub mod config_helper; pub mod console_helper; #[cfg(test)] mod tests; mod utils; -#[cfg(feature = "cloud")] -pub trait CloudOrchestrator { - fn deploy( - &mut self, - cloud: Cloud, - deploy: DeployCommandArgs, - config_paths: Vec, - ) -> Result<(), OrchestrationError>; - fn delete(&mut self, cloud: Cloud) -> Result<(), OrchestrationError>; - fn list(&mut self, cloud: Cloud, list: ListCommandArgs) -> Result<(), OrchestrationError>; - fn status(&mut self, cloud: Cloud) -> Result<(), OrchestrationError>; - fn monitor(&mut self, cloud: Cloud) -> Result<(), OrchestrationError>; - fn trace_logs(&mut self, cloud: Cloud, logs: LogCommandArgs) -> Result<(), OrchestrationError>; - fn login( - &mut self, - cloud: Cloud, - organisation_slug: Option, - profile: Option, - client_id: Option, - client_secret: Option, - ) -> Result<(), OrchestrationError>; - fn execute_secrets_command( - &mut self, - cloud: Cloud, - command: SecretsCommand, - ) -> Result<(), OrchestrationError>; -} - // Re-exports pub use dozer_ingestion::{ connectors::{get_connector, TableInfo}, @@ -64,13 +32,20 @@ pub fn wrapped_statement_to_pipeline(sql: &str) -> Result>, ) -> Result<(), OrchestrationError> { diff --git a/dozer-cli/src/main.rs b/dozer-cli/src/main.rs index 5b7b081312..516fe119a3 100644 --- a/dozer-cli/src/main.rs +++ b/dozer-cli/src/main.rs @@ -1,14 +1,13 @@ +use ::dozer_cloud_client::run_cloud; use clap::Parser; -#[cfg(feature = "cloud")] -use dozer_cli::cli::cloud::CloudCommands; use dozer_cli::cli::generate_config_repl; use dozer_cli::cli::types::{Cli, Commands, ConnectorCommand, RunCommands, SecurityCommands}; use dozer_cli::cli::{init_dozer, list_sources, LOGO}; -use dozer_cli::errors::{CliError, CloudError, OrchestrationError}; +use dozer_cli::errors::{CliError, OrchestrationError}; use dozer_cli::simple::SimpleOrchestrator; -#[cfg(feature = "cloud")] -use dozer_cli::CloudOrchestrator; use dozer_cli::{live, set_ctrl_handler, set_panic_hook, shutdown}; +use dozer_cloud_client::errors::display_cloud_error; + use dozer_tracing::LabelsAndProgress; use dozer_types::models::telemetry::{TelemetryConfig, TelemetryMetricsConfig}; use dozer_types::serde::Deserialize; @@ -17,8 +16,6 @@ use tokio::runtime::Runtime; use tokio::time; use clap::CommandFactory; -#[cfg(feature = "cloud")] -use dozer_cli::cloud_app_context::CloudAppContext; use std::cmp::Ordering; use std::sync::Arc; @@ -196,34 +193,13 @@ fn run() -> Result<(), OrchestrationError> { #[cfg(feature = "cloud")] Commands::Cloud(cloud) => { render_logo(); - - match cloud.command.clone() { - CloudCommands::Deploy(deploy) => dozer.deploy(cloud, deploy, cli.config_paths), - CloudCommands::Login { - organisation_slug, - profile_name, - client_id, - client_secret, - } => dozer.login( - cloud, - organisation_slug, - profile_name, - client_id, - client_secret, - ), - CloudCommands::Secrets(command) => dozer.execute_secrets_command(cloud, command), - CloudCommands::Delete => dozer.delete(cloud), - CloudCommands::Status => dozer.status(cloud), - CloudCommands::Monitor => dozer.monitor(cloud), - CloudCommands::Logs(logs) => dozer.trace_logs(cloud, logs), - CloudCommands::Version(version) => dozer.version(cloud, version), - CloudCommands::List(list) => dozer.list(cloud, list), - CloudCommands::SetApp { app_id } => { - CloudAppContext::save_app_id(app_id.clone())?; - info!("Using \"{app_id}\" app"); - Ok(()) - } - } + run_cloud( + dozer.runtime.clone(), + dozer.config.clone(), + cloud, + cli.config_paths, + ) + .map_err(OrchestrationError::CloudError) } Commands::Init => { panic!("This should not happen as it is handled in parse_and_generate"); @@ -303,12 +279,8 @@ fn init_orchestrator(cli: &Cli) -> Result { } fn display_error(e: &OrchestrationError) { - if let OrchestrationError::CloudError(CloudError::ApplicationNotFound) = &e { - let description = "Dozer cloud service was not able to find application. \n\n\ - Please check your application id in `dozer-config.cloud.yaml` file.\n\ - To change it, you can manually update file or use \"dozer cloud set-app {app_id}\"."; - - error!("{}", description); + if let OrchestrationError::CloudError(e) = &e { + display_cloud_error(e); } else { error!("{}", e); } diff --git a/dozer-cli/src/simple/cloud/deployer.rs b/dozer-cli/src/simple/cloud/deployer.rs deleted file mode 100644 index 346043c500..0000000000 --- a/dozer-cli/src/simple/cloud/deployer.rs +++ /dev/null @@ -1,106 +0,0 @@ -use crate::errors::CloudError; - -use crate::simple::cloud::progress_printer::ProgressPrinter; -use crate::simple::token_layer::TokenLayer; -use dozer_types::grpc_types::cloud::dozer_cloud_client::DozerCloudClient; -use dozer_types::grpc_types::cloud::DeploymentStatus; -use dozer_types::grpc_types::cloud::GetDeploymentStatusRequest; - -use dozer_types::grpc_types::cloud::DeployAppRequest; -use dozer_types::grpc_types::cloud::File; -use dozer_types::grpc_types::cloud::{Secret, StopRequest, StopResponse}; -use dozer_types::log::info; - -pub async fn deploy_app( - client: &mut DozerCloudClient, - app_id: &Option, - secrets: Vec, - allow_incompatible: bool, - files: Vec, - follow: bool, -) -> Result<(), CloudError> { - let response = client - .deploy_application(DeployAppRequest { - app_id: app_id.clone(), - secrets, - allow_incompatible, - files, - }) - .await? - .into_inner(); - - let app_id = response.app_id; - let deployment_id = response.deployment_id; - let url = response.deployment_url; - info!("Deploying new application with App Id: {app_id}, Deployment Id: {deployment_id}"); - info!("Follow the deployment progress at {url}"); - - if follow { - print_progress(client, app_id, deployment_id).await?; - } - - Ok::<(), CloudError>(()) -} - -async fn print_progress( - client: &mut DozerCloudClient, - app_id: String, - deployment_id: String, -) -> Result<(), CloudError> { - let mut current_step = 0; - let mut printer = ProgressPrinter::new(); - let request = GetDeploymentStatusRequest { - app_id, - deployment_id, - }; - loop { - let response = client - .get_deployment_status(request.clone()) - .await? - .into_inner(); - - if response.status == DeploymentStatus::Success as i32 { - info!("Deployment completed successfully"); - break; - } else if response.status == DeploymentStatus::Failed as i32 { - info!("Deployment failed!"); - break; - } else { - let steps = response.steps.clone(); - let completed_steps = response - .steps - .into_iter() - .filter(|s| { - s.status == DeploymentStatus::Success as i32 && s.step_index >= current_step - }) - .map(|s| (s.step_index, s.step_text)) - .collect::>(); - - for (step_no, text) in completed_steps.iter() { - printer.complete_step(*step_no, text) - } - - if !completed_steps.is_empty() { - current_step = completed_steps.last().unwrap().0 + 1; - let text = steps[current_step as usize].step_text.clone(); - printer.start_step(current_step, &text); - } - } - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - Ok::<(), CloudError>(()) -} - -pub async fn stop_app( - client: &mut DozerCloudClient, - app_id: &str, -) -> Result { - let result = client - .stop_dozer(StopRequest { - app_id: app_id.to_string(), - }) - .await? - .into_inner(); - - Ok(result) -} diff --git a/dozer-cli/src/simple/cloud/login.rs b/dozer-cli/src/simple/cloud/login.rs deleted file mode 100644 index 8ae342d1be..0000000000 --- a/dozer-cli/src/simple/cloud/login.rs +++ /dev/null @@ -1,210 +0,0 @@ -use crate::errors::{CloudCredentialError, CloudLoginError}; -use std::collections::HashMap; -use std::{env, fs, io}; -use tonic::Code::NotFound; - -use crate::errors::CloudLoginError::OrganisationNotFound; -use dozer_types::grpc_types::cloud::company_request::Criteria; -use dozer_types::grpc_types::cloud::dozer_public_client::DozerPublicClient; -use dozer_types::grpc_types::cloud::CompanyRequest; - -use dozer_types::serde::{Deserialize, Serialize}; -use dozer_types::serde_json::{self, Value}; -use dozer_types::serde_yaml; - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(crate = "dozer_types::serde")] -pub struct CredentialInfo { - pub profile_name: String, - pub client_id: String, - pub client_secret: String, - pub target_url: String, - pub auth_url: String, -} -const DOZER_FOLDER: &str = ".dozer"; -const CREDENTIALS_FILE_NAME: &str = "credentials.yaml"; - -impl CredentialInfo { - fn get_directory_path() -> String { - let home_dir = env::var("HOME").unwrap_or_else(|_| ".".to_string()); - format!("{}/{}", home_dir, DOZER_FOLDER) - } - - fn get_file_path() -> String { - let file_path = format!( - "{}/{}", - CredentialInfo::get_directory_path(), - CREDENTIALS_FILE_NAME - ); - file_path - } - pub fn save(&self) -> Result<(), CloudCredentialError> { - let file_path: String = CredentialInfo::get_file_path(); - fs::create_dir_all(CredentialInfo::get_directory_path()) - .map_err(CloudCredentialError::FailedToCreateDirectory)?; - let f = std::fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(file_path)?; - let mut current_credential_infos: Vec = CredentialInfo::read_profile()?; - current_credential_infos.append(&mut vec![self.clone()]); - current_credential_infos.dedup_by_key(|key| key.to_owned().profile_name); - serde_yaml::to_writer(f, ¤t_credential_infos)?; - Ok(()) - } - - fn read_profile() -> Result, CloudCredentialError> { - let file_path = CredentialInfo::get_file_path(); - - let file = std::fs::File::open(file_path) - .map_err(|_e| CloudCredentialError::MissingCredentialFile)?; - serde_yaml::from_reader::>(file) - .map_err(CloudCredentialError::SerializationError) - } - - pub fn load(name: Option) -> Result { - let credential_info: Vec = CredentialInfo::read_profile()?; - match name { - Some(name) => { - let credential_info = credential_info - .into_iter() - .find(|info| info.profile_name == name) - .ok_or(CloudCredentialError::MissingProfile)?; - Ok(credential_info) - } - _ => credential_info - .into_iter() - .next() - .ok_or(CloudCredentialError::MissingProfile), - } - } - - pub async fn get_access_token(&self) -> Result { - let client = reqwest::Client::builder() - .build() - .map_err(CloudCredentialError::HttpRequestError)?; - let mut headers = reqwest::header::HeaderMap::new(); - headers.insert( - "Content-Type", - "application/x-www-form-urlencoded".parse().unwrap(), - ); - - let mut params: HashMap<&str, &str> = HashMap::new(); - params.insert("grant_type", "client_credentials"); - params.insert("client_id", self.client_id.as_str()); - params.insert("client_secret", self.client_secret.as_str()); - let request = client - .request(reqwest::Method::POST, self.auth_url.to_owned()) - .headers(headers) - .form(¶ms); - let response = request - .send() - .await - .map_err(CloudCredentialError::HttpRequestError)?; - let json_response: Value = response - .json() - .await - .map_err(CloudCredentialError::HttpRequestError)?; - serde_json::from_value::(json_response) - .map_err(CloudCredentialError::JsonSerializationError) - } -} - -pub struct LoginSvc { - auth_url: String, - target_url: String, -} -#[derive(Eq, PartialEq, Clone, Serialize, Deserialize, Debug)] -#[serde(crate = "dozer_types::serde")] -pub struct TokenResponse { - pub access_token: String, - pub token_type: String, - pub expires_in: i32, -} -impl LoginSvc { - pub async fn new( - organisation_slug: String, - target_url: String, - ) -> Result { - let mut client = DozerPublicClient::connect(target_url.to_owned()).await?; - let company_info = client - .company_metadata(CompanyRequest { - criteria: Some(Criteria::Slug(organisation_slug.to_owned())), - }) - .await - .map_err(|e| { - if e.code() == NotFound { - OrganisationNotFound - } else { - CloudLoginError::from(e) - } - })?; - - let company_info = company_info.into_inner(); - Ok(Self { - auth_url: company_info.auth_url, - target_url, - }) - } - pub async fn login( - &self, - profile: Option, - client_id: Option, - client_secret: Option, - ) -> Result<(), CloudLoginError> { - self.login_by_credential(profile, client_id, client_secret) - .await - } - - async fn login_by_credential( - &self, - profile: Option, - client_id: Option, - client_secret: Option, - ) -> Result<(), CloudLoginError> { - let profile_name = match profile { - Some(profile) => profile, - None => { - let mut profile_name = String::new(); - println!("Please enter profile name:"); - io::stdin().read_line(&mut profile_name)?; - profile_name.trim().to_owned() - } - }; - - let client_id = match client_id { - Some(client_id) => client_id, - None => { - let mut client_id = String::new(); - println!("Please enter your client_id:"); - io::stdin().read_line(&mut client_id)?; - client_id.trim().to_owned() - } - }; - - let client_secret = match client_secret { - Some(secret) => secret, - None => { - let mut client_secret = String::new(); - println!("Please enter your client_secret:"); - io::stdin().read_line(&mut client_secret)?; - client_secret.trim().to_owned() - } - }; - - let credential_info = CredentialInfo { - client_id, - client_secret, - profile_name, - target_url: self.target_url.to_owned(), - auth_url: self.auth_url.to_owned(), - }; - - let token = credential_info.get_access_token().await?; - println!("Temporary bearer token: {}\n", token.access_token); - credential_info.save()?; - println!("Login success !"); - Ok(()) - } -} diff --git a/dozer-cli/src/simple/cloud/mod.rs b/dozer-cli/src/simple/cloud/mod.rs deleted file mode 100644 index b76b2213a2..0000000000 --- a/dozer-cli/src/simple/cloud/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod deployer; -pub mod login; -pub mod monitor; -pub mod progress_printer; -pub mod version; diff --git a/dozer-cli/src/simple/cloud/monitor.rs b/dozer-cli/src/simple/cloud/monitor.rs deleted file mode 100644 index 09a200bb2c..0000000000 --- a/dozer-cli/src/simple/cloud/monitor.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::cli::cloud::Cloud; -use crate::cloud_app_context::CloudAppContext; -use crate::errors::CloudError; -use crate::simple::cloud_orchestrator::get_cloud_client; -use crate::simple::token_layer::TokenLayer; -use dozer_types::grpc_types::cloud::dozer_cloud_client::DozerCloudClient; -use dozer_types::grpc_types::cloud::StatusUpdate; -use dozer_types::grpc_types::cloud::StatusUpdateRequest; -use dozer_types::indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::runtime::Runtime; - -pub fn monitor_app( - cloud: &Cloud, - cloud_config: Option<&dozer_types::models::cloud::Cloud>, - runtime: Arc, -) -> Result<(), CloudError> { - let app_id = cloud - .app_id - .clone() - .unwrap_or(CloudAppContext::get_app_id(cloud_config)?); - - runtime.block_on(async move { - let mut client: DozerCloudClient = - get_cloud_client(cloud, cloud_config).await?; - let mut response = client - .on_status_update(StatusUpdateRequest { app_id }) - .await? - .into_inner(); - - let mut bars: HashMap = HashMap::new(); - let m = MultiProgress::new(); - - while let Some(StatusUpdate { source, count, .. }) = response.message().await? { - match bars.get(&source) { - None => { - let pb = attach_progress(&m); - pb.set_message(source.clone()); - pb.set_position(count); - bars.insert(source.clone(), pb); - } - Some(bar) => { - bar.set_position(count); - } - }; - } - - Ok::<(), CloudError>(()) - })?; - - Ok(()) -} - -fn attach_progress(m: &MultiProgress) -> ProgressBar { - let pb = ProgressBar::new_spinner(); - m.add(pb.clone()); - pb.set_style( - ProgressStyle::with_template("{spinner:.red} {msg}: {pos}: {per_sec}") - .unwrap() - // For more spinners check out the cli-spinners project: - // https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json - .tick_strings(&[ - "▹▹▹▹▹", - "▸▹▹▹▹", - "▹▸▹▹▹", - "▹▹▸▹▹", - "▹▹▹▸▹", - "▹▹▹▹▸", - "▪▪▪▪▪", - ]), - ); - pb -} diff --git a/dozer-cli/src/simple/cloud/progress_printer.rs b/dozer-cli/src/simple/cloud/progress_printer.rs deleted file mode 100644 index 5bdd7577b5..0000000000 --- a/dozer-cli/src/simple/cloud/progress_printer.rs +++ /dev/null @@ -1,36 +0,0 @@ -use crate::console_helper::get_colored_text; -use crate::console_helper::GREEN; -use crate::console_helper::YELLOW; -use dozer_types::indicatif::ProgressBar; - -pub struct ProgressPrinter { - pb: ProgressBar, -} - -impl ProgressPrinter { - pub fn new() -> Self { - Self { - pb: ProgressBar::new_spinner(), - } - } - - pub fn start_step(&mut self, step_no: u32, text: &str) { - self.pb.println(""); - - self.pb.set_message(format!( - "[{}] {}", - step_no + 1, - get_colored_text(text, YELLOW) - )); - } - - pub fn complete_step(&mut self, step_no: u32, text: &str) { - self.pb.println(format!( - " ✅ [{}] {}", - step_no + 1, - get_colored_text(text, GREEN) - )); - - self.pb.set_message("".to_string()); - } -} diff --git a/dozer-cli/src/simple/cloud/version.rs b/dozer-cli/src/simple/cloud/version.rs deleted file mode 100644 index 913380c3aa..0000000000 --- a/dozer-cli/src/simple/cloud/version.rs +++ /dev/null @@ -1,160 +0,0 @@ -use std::collections::HashMap; - -use dozer_api::rest::DOZER_SERVER_NAME_HEADER; -use dozer_cache::Phase; -use dozer_types::{ - prettytable::{row, table}, - serde_json, -}; - -use crate::errors::CloudError; - -#[derive(Debug)] -pub struct PathStatus { - count: usize, - phase: Phase, -} - -#[derive(Debug, Default)] -pub struct ServerStatus { - paths: HashMap, -} - -#[derive(Debug)] -pub struct VersionStatus { - servers: Vec, -} - -pub async fn get_version_status( - endpoint: &str, - version: u32, - api_available: i32, -) -> Result { - let (clients, paths) = probe_dozer_servers(endpoint, version, api_available).await?; - - let mut servers = vec![]; - for client in clients { - let mut server_status = ServerStatus::default(); - for path in &paths { - let count = client - .post(format!("{}/v{}{}/count", endpoint, version, path)) - .send() - .await? - .error_for_status()?; - let count = count.json::().await?; - - let phase = client - .post(format!("{}/v{}{}/phase", endpoint, version, path)) - .send() - .await? - .error_for_status()?; - let phase = phase.json::().await?; - - server_status - .paths - .insert(path.clone(), PathStatus { count, phase }); - } - servers.push(server_status) - } - - Ok(VersionStatus { servers }) -} - -/// Probe the servers to get a list of clients with sticky session cookie, and all the paths available. -async fn probe_dozer_servers( - endpoint: &str, - version: u32, - count_hint: i32, -) -> Result<(Vec, Vec), CloudError> { - let mut clients = HashMap::, reqwest::Client>::default(); - let mut paths = vec![]; - - // Try to visit the version endpoint many times to cover all the servers. - for _ in 0..count_hint * 5 { - let client = reqwest::Client::builder().cookie_store(true).build()?; - let response = client - .get(format!("{}/v{}/", endpoint, version)) - .send() - .await? - .error_for_status()?; - let server_name = response - .headers() - .get(DOZER_SERVER_NAME_HEADER) - .ok_or(CloudError::MissingResponseHeader)? - .as_bytes(); - - if clients.contains_key(server_name) { - continue; - } - clients.insert(server_name.to_vec(), client); - - if paths.is_empty() { - paths = response.json::>().await?; - } - } - - Ok((clients.into_values().collect(), paths)) -} - -pub fn version_status_table(status: &Result) -> String { - if let Ok(status) = status { - let mut table = table!(); - for server in &status.servers { - let mut server_table = table!(); - for (path, PathStatus { count, phase }) in &server.paths { - let phase = serde_json::to_string(phase).expect("Should always succeed"); - server_table.add_row(row![path, count, phase]); - } - table.add_row(row![server_table]); - } - table.to_string() - } else { - "Unavailable".to_string() - } -} - -pub fn version_is_up_to_date( - status: &Result, - current_status: &Result, -) -> bool { - if let Ok(status) = status { - if let Ok(current_status) = current_status { - version_is_up_to_date_impl(status, current_status) - } else { - true - } - } else { - false - } -} - -/// We say a version is up to date if any server of this version are up to date with any server of current version. -fn version_is_up_to_date_impl(status: &VersionStatus, current_status: &VersionStatus) -> bool { - for server in &status.servers { - for current_server in ¤t_status.servers { - if server_is_up_to_date(server, current_server) { - return true; - } - } - } - true -} - -/// We say a server is up to date if all paths are up to date. -fn server_is_up_to_date(status: &ServerStatus, current_status: &ServerStatus) -> bool { - for (path, status) in &status.paths { - if !path_is_up_to_date(status, current_status.paths.get(path)) { - return false; - } - } - true -} - -fn path_is_up_to_date(status: &PathStatus, current_status: Option<&PathStatus>) -> bool { - if let Some(current_status) = current_status { - status.phase == Phase::Streaming - || (current_status.phase == Phase::Snapshotting && status.count >= current_status.count) - } else { - true - } -} diff --git a/dozer-cli/src/simple/cloud_orchestrator.rs b/dozer-cli/src/simple/cloud_orchestrator.rs deleted file mode 100644 index d2f71a6236..0000000000 --- a/dozer-cli/src/simple/cloud_orchestrator.rs +++ /dev/null @@ -1,606 +0,0 @@ -use crate::cli::cloud::{ - Cloud, DeployCommandArgs, ListCommandArgs, LogCommandArgs, SecretsCommand, VersionCommand, -}; -use crate::cloud_app_context::CloudAppContext; -use crate::cloud_helper::list_files; - -use crate::errors::OrchestrationError::FailedToReadOrganisationName; -use crate::errors::{map_tonic_error, CliError, CloudError, CloudLoginError, OrchestrationError}; -use crate::simple::cloud::deployer::{deploy_app, stop_app}; -use crate::simple::cloud::login::CredentialInfo; -use crate::simple::cloud::monitor::monitor_app; -use crate::simple::token_layer::TokenLayer; -use crate::simple::SimpleOrchestrator; -use crate::CloudOrchestrator; -use dozer_types::constants::{DEFAULT_CLOUD_TARGET_URL, LOCK_FILE}; -use dozer_types::grpc_types::cloud::{ - dozer_cloud_client::DozerCloudClient, CreateSecretRequest, DeleteAppRequest, - DeleteSecretRequest, GetSecretRequest, GetStatusRequest, ListAppRequest, ListSecretsRequest, - LogMessageRequest, UpdateSecretRequest, -}; -use dozer_types::grpc_types::cloud::{ - DeploymentInfo, DeploymentStatusWithHealth, File, ListDeploymentRequest, - SetCurrentVersionRequest, UpsertVersionRequest, -}; -use dozer_types::log::info; -use dozer_types::prettytable::{row, table}; -use futures::{select, FutureExt, StreamExt}; -use std::io; -use tonic::transport::Endpoint; -use tower::ServiceBuilder; - -use super::cloud::login::LoginSvc; -use super::cloud::version::{get_version_status, version_is_up_to_date, version_status_table}; - -pub async fn get_cloud_client( - cloud: &Cloud, - cloud_config: Option<&dozer_types::models::cloud::Cloud>, -) -> Result, CloudError> { - let profile_name = match &cloud.profile { - None => cloud_config.as_ref().and_then(|c| c.profile.clone()), - Some(_) => cloud.profile.clone(), - }; - let credential = CredentialInfo::load(profile_name)?; - let target_url = cloud - .target_url - .as_ref() - .unwrap_or(&credential.target_url) - .clone(); - info!("Connecting to cloud service \"{}\"", target_url); - let endpoint = Endpoint::from_shared(target_url.to_owned())?; - let channel = Endpoint::connect(&endpoint).await?; - let channel = ServiceBuilder::new() - .layer_fn(|channel| TokenLayer::new(channel, credential.clone())) - .service(channel); - let client = DozerCloudClient::new(channel); - Ok(client) -} - -impl CloudOrchestrator for SimpleOrchestrator { - // TODO: Deploy Dozer application using local Dozer configuration - fn deploy( - &mut self, - cloud: Cloud, - deploy: DeployCommandArgs, - config_paths: Vec, - ) -> Result<(), OrchestrationError> { - let app_id = if cloud.app_id.is_some() { - cloud.app_id.clone() - } else { - let app_id_from_context = CloudAppContext::get_app_id(self.config.cloud.as_ref()); - match app_id_from_context { - Ok(id) => Some(id), - Err(_) => None, - } - }; - - let cloud_config = self.config.cloud.as_ref(); - let lockfile_path = self.lockfile_path(); - self.runtime.block_on(async move { - let mut client = get_cloud_client(&cloud, cloud_config).await?; - let mut files = list_files(config_paths)?; - if deploy.locked { - let lockfile_contents = tokio::fs::read_to_string(lockfile_path) - .await - .map_err::(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - CloudError::LockfileNotFound.into() - } else { - CliError::Io(e).into() - } - })?; - let lockfile = File { - name: LOCK_FILE.to_owned(), - content: lockfile_contents, - }; - files.push(lockfile); - } - - // 2. START application - deploy_app( - &mut client, - &app_id, - deploy.secrets, - deploy.allow_incompatible, - files, - deploy.follow, - ) - .await?; - Ok::<(), OrchestrationError>(()) - })?; - Ok(()) - } - - fn delete(&mut self, cloud: Cloud) -> Result<(), OrchestrationError> { - // Get app_id from command line argument if there, otherwise take it from the cloud config file - // if the app_id is from the cloud config file then set `delete_cloud_file` to true and use it later - // to delete the file after deleting the app - - let (app_id, delete_cloud_file) = if let Some(app_id) = cloud.app_id.clone() { - // if the app_id on command line is equal to the one in the cloud config file then file can be deleted - if app_id == CloudAppContext::get_app_id(self.config.cloud.as_ref())? { - (app_id, true) - } else { - (app_id, false) - } - } else { - ( - CloudAppContext::get_app_id(self.config.cloud.as_ref())?, - true, - ) - }; - - let mut double_check = String::new(); - println!("Are you sure to delete the application {}? (y/N)", app_id); - io::stdin() - .read_line(&mut double_check) - .map_err(FailedToReadOrganisationName)?; - let response = double_check.trim().to_string().to_uppercase(); - - if response == "Y" { - info!("Deleting application {}", app_id); - } else { - info!("The application {} was not deleted", app_id); - return Ok(()); - } - - let cloud_config = self.config.cloud.as_ref(); - self.runtime.block_on(async move { - let mut client = get_cloud_client(&cloud, cloud_config).await?; - - stop_app(&mut client, &app_id).await?; - - // steps.start_next_step(); - let delete_result = client - .delete_application(DeleteAppRequest { - app_id: app_id.clone(), - }) - .await - .map_err(map_tonic_error)? - .into_inner(); - - if delete_result.success { - info!("Deleted {}", &app_id); - - if delete_cloud_file { - let _ = CloudAppContext::delete_config_file(); - } - } - - Ok::<(), CloudError>(()) - })?; - - Ok(()) - } - - fn list(&mut self, cloud: Cloud, list: ListCommandArgs) -> Result<(), OrchestrationError> { - let cloud_config = self.config.cloud.as_ref(); - self.runtime.block_on(async move { - let mut client = get_cloud_client(&cloud, cloud_config).await?; - let response = client - .list_applications(ListAppRequest { - limit: list.limit, - offset: list.offset, - name: list.name, - uuid: list.uuid, - order_by: None, - desc: None, - }) - .await - .map_err(map_tonic_error)? - .into_inner(); - - let mut table = table!(); - - for app in response.apps { - if let Some(app_data) = app.app { - table.add_row(row![app.app_id, app_data.convert_to_table()]); - } - } - - table.printstd(); - - info!( - "Total apps: {}", - response - .pagination - .map_or_else(|| 0, |pagination| pagination.total) - ); - - Ok::<(), CloudError>(()) - })?; - - Ok(()) - } - - fn status(&mut self, cloud: Cloud) -> Result<(), OrchestrationError> { - let app_id = cloud - .app_id - .clone() - .unwrap_or(CloudAppContext::get_app_id(self.config.cloud.as_ref())?); - let cloud_config = self.config.cloud.as_ref(); - self.runtime.block_on(async move { - let mut client = get_cloud_client(&cloud, cloud_config).await?; - let response = client - .get_status(GetStatusRequest { app_id }) - .await - .map_err(map_tonic_error)? - .into_inner(); - - let mut table = table!(); - - table.add_row(row!["Api endpoint", response.data_endpoint,]); - - let mut deployment_table = table!(); - deployment_table.set_titles(row![ - "Deployment", - "App", - "Api", - "Version", - "Phase", - "Error" - ]); - - for status in response.deployments.iter() { - let deployment = status.deployment.as_ref().expect("deployment is expected"); - fn mark(status: bool) -> &'static str { - if status { - "🟢" - } else { - "🟠" - } - } - - let mut version = "".to_string(); - for (loop_version, loop_deployment) in response.versions.iter() { - if loop_deployment == &deployment.deployment { - if Some(*loop_version) == response.current_version { - version = format!("v{loop_version} (current)"); - } else { - version = format!("v{loop_version}"); - } - break; - } - } - - deployment_table.add_row(row![ - deployment.deployment, - format!("Deployment Status: {:?}", deployment.status), - format!("Version: {}", version), - ]); - for r in status.resources.iter() { - deployment_table.add_row(row![ - "", - format!("{}: {}", r.typ, mark(r.available == r.desired)), - format!("{}/{}", r.available.unwrap_or(0), r.desired.unwrap_or(0)), - ]); - } - } - - table.add_row(row!["Deployments", deployment_table]); - - table.printstd(); - Ok::<(), CloudError>(()) - })?; - - Ok(()) - } - - fn monitor(&mut self, cloud: Cloud) -> Result<(), OrchestrationError> { - monitor_app(&cloud, self.config.cloud.as_ref(), self.runtime.clone()) - .map_err(crate::errors::OrchestrationError::CloudError) - } - - fn trace_logs(&mut self, cloud: Cloud, logs: LogCommandArgs) -> Result<(), OrchestrationError> { - let app_id = cloud - .app_id - .clone() - .unwrap_or(CloudAppContext::get_app_id(self.config.cloud.as_ref())?); - let cloud_config = self.config.cloud.as_ref(); - self.runtime.block_on(async move { - let mut client = get_cloud_client(&cloud, cloud_config).await?; - - let res = client - .list_deployments(ListDeploymentRequest { - app_id: app_id.clone(), - }) - .await - .map_err(map_tonic_error)? - .into_inner(); - - // Show log of the latest deployment for now. - let Some(deployment) = logs - .deployment - .or_else(|| latest_deployment(&res.deployments)) - else { - info!("No deployments found"); - return Ok(()); - }; - let mut response = client - .on_log_message(LogMessageRequest { - app_id, - deployment, - follow: logs.follow, - include_build: !logs.ignore_build, - include_app: !logs.ignore_app, - include_api: !logs.ignore_api, - }) - .await - .map_err(map_tonic_error)? - .into_inner() - .fuse(); - - let mut ctrlc = std::pin::pin!(tokio::signal::ctrl_c().fuse()); - loop { - select! { - message = response.next() => { - if let Some(message) = message { - let message = message?; - for line in message.message.lines() { - info!("[{}] {line}", message.from); - } - } else { - break; - } - } - _ = ctrlc => { - break; - } - }; - } - - Ok::<(), CloudError>(()) - })?; - - Ok(()) - } - - fn login( - &mut self, - cloud: Cloud, - organisation_slug: Option, - profile: Option, - client_id: Option, - client_secret: Option, - ) -> Result<(), OrchestrationError> { - info!("Organisation and client details can be created in https://dashboard.dev.getdozer.io/login \n"); - let organisation_slug = match organisation_slug { - None => { - let mut organisation_slug = String::new(); - println!("Please enter your organisation slug:"); - io::stdin() - .read_line(&mut organisation_slug) - .map_err(FailedToReadOrganisationName)?; - organisation_slug.trim().to_string() - } - Some(name) => name, - }; - - self.runtime.block_on(async move { - let login_svc = LoginSvc::new( - organisation_slug, - cloud - .target_url - .unwrap_or(DEFAULT_CLOUD_TARGET_URL.to_string()), - ) - .await?; - login_svc.login(profile, client_id, client_secret).await?; - Ok::<(), CloudLoginError>(()) - })?; - Ok(()) - } - - fn execute_secrets_command( - &mut self, - cloud: Cloud, - command: SecretsCommand, - ) -> Result<(), OrchestrationError> { - let app_id = cloud - .app_id - .clone() - .unwrap_or(CloudAppContext::get_app_id(self.config.cloud.as_ref())?); - let cloud_config = self.config.cloud.as_ref(); - - self.runtime.block_on(async move { - let mut client = get_cloud_client(&cloud, cloud_config).await?; - - match command { - SecretsCommand::Create { name, value } => { - client - .create_secret(CreateSecretRequest { - app_id, - name, - value, - }) - .await - .map_err(map_tonic_error)?; - - info!("Secret created"); - } - SecretsCommand::Update { name, value } => { - client - .update_secret(UpdateSecretRequest { - app_id, - name, - value, - }) - .await - .map_err(map_tonic_error)?; - - info!("Secret updated"); - } - SecretsCommand::Delete { name } => { - client - .delete_secret(DeleteSecretRequest { app_id, name }) - .await - .map_err(map_tonic_error)?; - - info!("Secret deleted") - } - SecretsCommand::Get { name } => { - let response = client - .get_secret(GetSecretRequest { app_id, name }) - .await - .map_err(map_tonic_error)? - .into_inner(); - - info!("Secret \"{}\" exist", response.name); - } - SecretsCommand::List {} => { - let response = client - .list_secrets(ListSecretsRequest { app_id }) - .await - .map_err(map_tonic_error)? - .into_inner(); - - info!("Secrets:"); - let mut table = table!(); - - for secret in response.secrets { - table.add_row(row![secret]); - } - - table.printstd(); - } - } - Ok::<_, CloudError>(()) - })?; - - Ok(()) - } -} - -impl SimpleOrchestrator { - pub fn version( - &mut self, - cloud: Cloud, - version: VersionCommand, - ) -> Result<(), OrchestrationError> { - let app_id = cloud - .app_id - .clone() - .unwrap_or(CloudAppContext::get_app_id(self.config.cloud.as_ref())?); - - let cloud_config = self.config.cloud.as_ref(); - self.runtime.block_on(async move { - let mut client = get_cloud_client(&cloud, cloud_config).await?; - - match version { - VersionCommand::Create { deployment } => { - let status = client - .get_status(GetStatusRequest { - app_id: app_id.clone(), - }) - .await - .map_err(map_tonic_error)? - .into_inner(); - let latest_version = status.versions.into_values().max().unwrap_or(0); - - client - .upsert_version(UpsertVersionRequest { - app_id, - version: latest_version + 1, - deployment, - }) - .await - .map_err(map_tonic_error)?; - } - VersionCommand::SetCurrent { version } => { - client - .set_current_version(SetCurrentVersionRequest { app_id, version }) - .await?; - } - VersionCommand::Status { version } => { - let status = client - .get_status(GetStatusRequest { app_id }) - .await - .map_err(map_tonic_error)? - .into_inner(); - let Some(deployment) = status.versions.get(&version) else { - info!("Version {} does not exist", version); - return Ok(()); - }; - let api_available = get_api_available(&status.deployments, *deployment); - - let version_status = - get_version_status(&status.data_endpoint, version, api_available).await; - let mut table = table!(); - - if let Some(current_version) = status.current_version { - if current_version != version { - let current_api_available = get_api_available( - &status.deployments, - status.versions[¤t_version], - ); - - table.add_row(row![ - format!("v{version}"), - version_status_table(&version_status) - ]); - - let current_version_status = get_version_status( - &status.data_endpoint, - current_version, - current_api_available, - ) - .await; - table.add_row(row![ - format!("v{current_version} (current)"), - version_status_table(¤t_version_status) - ]); - - table.printstd(); - - if version_is_up_to_date(&version_status, ¤t_version_status) { - info!("Version {} is up to date", version); - } else { - info!("Version {} is not up to date", version); - } - } else { - table.add_row(row![ - format!("v{version} (current)"), - version_status_table(&version_status) - ]); - table.printstd(); - } - } else { - table.add_row(row![ - format!("v{version}"), - version_status_table(&version_status) - ]); - table.printstd(); - info!("No current version"); - }; - } - } - - Ok::<_, CloudError>(()) - })?; - Ok(()) - } -} - -fn latest_deployment(deployments: &[DeploymentInfo]) -> Option { - deployments.iter().map(|status| status.deployment).max() -} - -fn get_api_available(deployments: &[DeploymentStatusWithHealth], deployment: u32) -> i32 { - let info = deployments - .iter() - .find(|status| { - status - .deployment - .as_ref() - .expect("deployment is expected") - .deployment - == deployment - }) - .expect("Deployment should be found in deployments"); - - info.resources - .clone() - .into_iter() - .find(|r| r.typ == "api") - .and_then(|r| r.available) - .unwrap_or(1) -} diff --git a/dozer-cli/src/simple/mod.rs b/dozer-cli/src/simple/mod.rs index 71202e79fe..f62118a27a 100644 --- a/dozer-cli/src/simple/mod.rs +++ b/dozer-cli/src/simple/mod.rs @@ -3,10 +3,5 @@ pub mod orchestrator; pub use orchestrator::SimpleOrchestrator; mod build; pub use build::{Contract, PipelineContract}; -#[cfg(feature = "cloud")] -mod cloud; -#[cfg(feature = "cloud")] -mod cloud_orchestrator; + pub mod helper; -#[cfg(feature = "cloud")] -mod token_layer; diff --git a/dozer-cli/src/simple/token_layer.rs b/dozer-cli/src/simple/token_layer.rs deleted file mode 100644 index a739dc74c9..0000000000 --- a/dozer-cli/src/simple/token_layer.rs +++ /dev/null @@ -1,49 +0,0 @@ -use super::cloud::login::CredentialInfo; -use http::{Request, Response}; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tonic::body::BoxBody; -use tonic::codegen::http; -use tonic::transport::Body; -use tonic::transport::Channel; -use tower::Service; - -pub struct TokenLayer { - inner: Channel, - credential: CredentialInfo, -} - -impl TokenLayer { - pub fn new(inner: Channel, credential: CredentialInfo) -> Self { - TokenLayer { inner, credential } - } -} - -impl Service> for TokenLayer { - type Response = Response; - type Error = Box; - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx).map_err(Into::into) - } - - fn call(&mut self, req: Request) -> Self::Future { - let clone = self.inner.clone(); - let mut inner = std::mem::replace(&mut self.inner, clone); - let credential = self.credential.clone(); - Box::pin(async move { - // Do extra async work here... - let token = credential.get_access_token().await?; - let mut new_request = req; - new_request.headers_mut().insert( - http::header::AUTHORIZATION, - format!("Bearer {}", token.access_token).parse().unwrap(), - ); - let response = inner.call(new_request).await?; - Ok(response) - }) - } -} diff --git a/dozer-types/build.rs b/dozer-types/build.rs index 99d5fa6416..3e7992b4e9 100644 --- a/dozer-types/build.rs +++ b/dozer-types/build.rs @@ -46,70 +46,5 @@ fn main() -> Result<(), Box> { .file_descriptor_set_path(out_dir.join("generated_films.bin")) .compile(&["protos/films.proto"], &["protos"])?; - // Cloud Service & Types - tonic_build::configure() - .protoc_arg("--experimental_allow_proto3_optional") - .extern_path( - ".dozer.cloud.Endpoint", - "crate::models::api_endpoint::ApiEndpoint", - ) - .extern_path(".dozer.cloud.Source", "crate::models::source::Source") - .extern_path(".dozer.cloud.AppConfig", "crate::models::config::Config") - .extern_path( - ".dozer.cloud.ConnectionConfig", - "crate::models::connection::ConnectionConfig", - ) - .extern_path( - ".dozer.cloud.Connection", - "crate::models::connection::Connection", - ) - .extern_path( - ".dozer.cloud.EthContract", - "crate::ingestion_types::EthContract", - ) - .extern_path( - ".dozer.cloud.EthereumFilter", - "crate::ingestion_types::EthereumFilter", - ) - .extern_path( - ".dozer.cloud.DeltaLakeConfig", - "crate::ingestion_types::DeltaLakeConfig", - ) - .extern_path( - ".dozer.cloud.LocalStorage", - "crate::ingestion_types::LocalStorage", - ) - .extern_path( - ".dozer.cloud.S3Storage", - "crate::ingestion_types::S3Storage", - ) - .extern_path( - ".dozer.cloud.KafkaConfig", - "crate::ingestion_types::KafkaConfig", - ) - .extern_path( - ".dozer.cloud.SnowflakeConfig", - "crate::ingestion_types::SnowflakeConfig", - ) - .extern_path( - ".dozer.cloud::grpc_config::Schemas", - "crate::ingestion_types::GrpcConfigSchemas", - ) - .extern_path( - ".dozer.cloud.GrpcConfig", - "crate::ingestion_types::GrpcConfig", - ) - .extern_path( - ".dozer.cloud.EthereumConfig", - "crate::ingestion_types::EthConfig", - ) - .extern_path( - ".dozer.cloud.PostgresConfig", - "crate::models::connection::PostgresConfig", - ) - .file_descriptor_set_path(out_dir.join("cloud.bin")) - .compile(&["protos/cloud.proto"], &["protos"]) - .unwrap(); - Ok(()) } diff --git a/dozer-types/src/grpc_types.rs b/dozer-types/src/grpc_types.rs index 785bf39d33..79c13992d8 100644 --- a/dozer-types/src/grpc_types.rs +++ b/dozer-types/src/grpc_types.rs @@ -27,22 +27,6 @@ pub mod ingest { pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("ingest"); } -pub mod cloud { - #![allow(clippy::derive_partial_eq_without_eq, clippy::large_enum_variant)] - tonic::include_proto!("dozer.cloud"); - pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("cloud"); - use crate::chrono::NaiveDateTime; - use prost_types::Timestamp; - pub fn naive_datetime_to_timestamp(naive_dt: NaiveDateTime) -> Timestamp { - let unix_timestamp = naive_dt.timestamp(); // Get the UNIX timestamp (seconds since epoch) - let nanos = naive_dt.timestamp_subsec_nanos() as i32; // Get nanoseconds part - prost_types::Timestamp { - seconds: unix_timestamp, - nanos, - } - } -} - pub mod contract { #![allow(clippy::derive_partial_eq_without_eq)] tonic::include_proto!("dozer.contract"); diff --git a/dozer-types/src/lib.rs b/dozer-types/src/lib.rs index fcb2d62c80..39a6bf18b4 100644 --- a/dozer-types/src/lib.rs +++ b/dozer-types/src/lib.rs @@ -46,3 +46,16 @@ pub use serde_json; pub use serde_yaml; pub use thiserror; pub use tracing; + +// Types required by API and other clients +pub mod api { + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] + pub enum Phase { + Snapshotting, + Streaming, + } + + pub const DOZER_SERVER_NAME_HEADER: &str = "x-dozer-server-name"; +}