From b859ac75aa413f2aca69775fb7ced3c46b9e731f Mon Sep 17 00:00:00 2001 From: solidiquis Date: Mon, 10 Jun 2024 14:22:48 -0700 Subject: [PATCH] docs: Add helpful comments for rust ingestion example --- examples/rust/ingestion_with_config/src/ingestion.rs | 8 +++++--- examples/rust/ingestion_with_config/src/main.rs | 9 ++++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/examples/rust/ingestion_with_config/src/ingestion.rs b/examples/rust/ingestion_with_config/src/ingestion.rs index 177a02be..f71060f0 100644 --- a/examples/rust/ingestion_with_config/src/ingestion.rs +++ b/examples/rust/ingestion_with_config/src/ingestion.rs @@ -74,6 +74,7 @@ impl SiftIngestionService { pub async fn send_values(&mut self, flows: Vec) -> Result<()> { let mut requests = Vec::new(); + // Turn our flows into request objects for flow in flows { let channel_values = flow .channel_values @@ -89,18 +90,19 @@ impl SiftIngestionService { channel_values, timestamp: Some(Timestamp::from(flow.timestamp)), flow: flow.name, + ingestion_config_id: self.ingestion_config.ingestion_config_id.clone(), + organization_id: self.organization_id.clone().unwrap_or_default(), + end_stream_on_validation_error: self.end_stream_on_error, run_id: self .run .as_ref() .map_or_else(String::new, |r| r.run_id.clone()), - ingestion_config_id: self.ingestion_config.ingestion_config_id.clone(), - organization_id: self.organization_id.clone().unwrap_or_default(), - end_stream_on_validation_error: self.end_stream_on_error, }; requests.push(request); } + // Stream requests and ignest data let stream = tokio_stream::iter(requests); let _ = self.client.ingest_with_config_data_stream(stream).await; Ok(()) diff --git a/examples/rust/ingestion_with_config/src/main.rs b/examples/rust/ingestion_with_config/src/main.rs index fd472707..5e4d21a1 100644 --- a/examples/rust/ingestion_with_config/src/main.rs +++ b/examples/rust/ingestion_with_config/src/main.rs @@ -32,22 +32,28 @@ async fn simulate() -> Result<()> { let _ = dotenv()?; let current_dir = env::current_dir()?; let config_name = env::var(CONFIG_ENV_VAR).map(|c| current_dir.join(CONFIG_DIR_NAME).join(c))?; + + // Load in the lunar_rover0.yml file and parse to TelemetryConfig let telemetry_config = TelemetryConfig::from_config(config_name)?; + let asset_name = telemetry_config.asset_name.clone(); + // Initialize gRPC transport channel let channel = grpc::use_channel()?; + let mut ingestion_service = SiftIngestionService::from_config(channel.clone(), telemetry_config).await?; ingestion_service.end_stream_on_error(); let mut current_time = Utc::now(); - // Start a run + // OPTIONAL: Start a run let run_name = format!( "{}.{}", asset_name.to_ascii_lowercase(), current_time.timestamp() ); + let _ = ingestion_service .start_run( channel.clone(), @@ -62,6 +68,7 @@ async fn simulate() -> Result<()> { let mut flows = Vec::new(); + // Create the actual flows i.e. values that we're going to ingest for i in 0..100 { let flow = Flow { // Will be validated downstream... value is from configs/lunar_rover0.yml.