Skip to content

Commit

Permalink
docs: Add helpful comments for rust ingestion example
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis committed Jun 10, 2024
1 parent be8047a commit b859ac7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
8 changes: 5 additions & 3 deletions examples/rust/ingestion_with_config/src/ingestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl SiftIngestionService {
pub async fn send_values(&mut self, flows: Vec<Flow>) -> Result<()> {
let mut requests = Vec::new();

// Turn our flows into request objects
for flow in flows {
let channel_values = flow
.channel_values
Expand All @@ -89,18 +90,19 @@ impl SiftIngestionService {
channel_values,
timestamp: Some(Timestamp::from(flow.timestamp)),
flow: flow.name,
ingestion_config_id: self.ingestion_config.ingestion_config_id.clone(),
organization_id: self.organization_id.clone().unwrap_or_default(),
end_stream_on_validation_error: self.end_stream_on_error,
run_id: self
.run
.as_ref()
.map_or_else(String::new, |r| r.run_id.clone()),
ingestion_config_id: self.ingestion_config.ingestion_config_id.clone(),
organization_id: self.organization_id.clone().unwrap_or_default(),
end_stream_on_validation_error: self.end_stream_on_error,
};

requests.push(request);
}

// Stream requests and ignest data
let stream = tokio_stream::iter(requests);
let _ = self.client.ingest_with_config_data_stream(stream).await;
Ok(())
Expand Down
9 changes: 8 additions & 1 deletion examples/rust/ingestion_with_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,28 @@ async fn simulate() -> Result<()> {
let _ = dotenv()?;
let current_dir = env::current_dir()?;
let config_name = env::var(CONFIG_ENV_VAR).map(|c| current_dir.join(CONFIG_DIR_NAME).join(c))?;

// Load in the lunar_rover0.yml file and parse to TelemetryConfig
let telemetry_config = TelemetryConfig::from_config(config_name)?;

let asset_name = telemetry_config.asset_name.clone();

// Initialize gRPC transport channel
let channel = grpc::use_channel()?;

let mut ingestion_service =
SiftIngestionService::from_config(channel.clone(), telemetry_config).await?;
ingestion_service.end_stream_on_error();

let mut current_time = Utc::now();

// Start a run
// OPTIONAL: Start a run
let run_name = format!(
"{}.{}",
asset_name.to_ascii_lowercase(),
current_time.timestamp()
);

let _ = ingestion_service
.start_run(
channel.clone(),
Expand All @@ -62,6 +68,7 @@ async fn simulate() -> Result<()> {

let mut flows = Vec::new();

// Create the actual flows i.e. values that we're going to ingest
for i in 0..100 {
let flow = Flow {
// Will be validated downstream... value is from configs/lunar_rover0.yml.
Expand Down

0 comments on commit b859ac7

Please sign in to comment.