diff --git a/Cargo.lock b/Cargo.lock index feecda7..404fe79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -361,6 +361,19 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "env_logger" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c90bf5f19754d10198ccb95b70664fc925bd1fc090a0fd9a6ebc54acc8cd6272" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "fnv" version = "1.0.7" @@ -597,6 +610,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.10" @@ -780,6 +799,7 @@ dependencies = [ "async-trait", "chrono", "criterion", + "env_logger", "futures-util", "log", "metrics", @@ -790,6 +810,7 @@ dependencies = [ "rusoto_core", "serde_json", "tokio", + "tokio-stream", ] [[package]] @@ -1560,6 +1581,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -1640,6 +1670,17 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-stream" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6edf2d6bc038a43d31353570e27270603f4648d18f5ed10c0e179abe43255af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.7" diff --git a/Cargo.toml b/Cargo.toml index 1ce9680..37dbf5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,10 +28,12 @@ serde_json = "1" tokio = { version = "1", features = ["sync", "time"] } [dev-dependencies] +env_logger = "*" proptest = "1" async-trait = "0.1.24" criterion = { version = "0.3", features = ["async_tokio"] } tokio = { version = "1", features = ["test-util"] } +tokio-stream = "0.1.9" [features] default = ["native-tls"] diff --git a/src/builder.rs b/src/builder.rs index 1e1c56a..6a3a8f4 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -2,6 +2,7 @@ use std::{collections::BTreeMap, fmt, pin::Pin}; use futures_util::{future, FutureExt, Stream}; use rusoto_cloudwatch::CloudWatch; +use tokio::sync::oneshot; use crate::{ collector::{self, Config, Resolution}, @@ -17,7 +18,7 @@ pub struct Builder { client: Box, shutdown_signal: Option>, metric_buffer_size: usize, - force_flush_stream: Option + Send>>>, + force_flush_stream: Option>> + Send>>>, } pub fn builder(region: rusoto_core::Region) -> Builder { @@ -56,7 +57,7 @@ impl Builder { /// held metric data in the same way as shutdown_signal will. pub fn force_flush_stream( mut self, - force_flush_stream: Pin + Send>>, + force_flush_stream: Pin>> + Send>>, ) -> Self { self.force_flush_stream = Some(force_flush_stream); self diff --git a/src/collector.rs b/src/collector.rs index 23fdb94..a973ec2 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -18,7 +18,7 @@ use { metrics::{GaugeValue, Key, Recorder, Unit}, rusoto_cloudwatch::{CloudWatch, Dimension, MetricDatum, PutMetricDataInput, StatisticSet}, rusoto_core::Region, - tokio::sync::mpsc, + tokio::sync::{mpsc, oneshot}, }; use crate::{error::Error, BoxFuture}; @@ -42,7 +42,7 @@ pub struct Config { pub client: Box, pub shutdown_signal: future::Shared>, pub metric_buffer_size: usize, - pub force_flush_stream: Option + Send>>>, + pub force_flush_stream: Option>> + Send>>>, } struct CollectorConfig { @@ -60,7 +60,8 @@ enum Message { Datum(Datum), SendBatch { send_all_before: Timestamp, - emit_sender: mpsc::Sender>, + emit_sender: mpsc::Sender<(Vec, Option>)>, + flush_signal: Option>, }, } @@ -164,13 +165,14 @@ pub fn new(mut config: Config) -> (RecorderHandle, impl Future) { .force_flush_stream .take() .unwrap_or_else(|| { - Box::pin(futures_util::stream::empty::<()>()) as Pin + Send>> + Box::pin(futures_util::stream::empty::>>()) as Pin>> + Send>> }) .map({ let emit_sender = emit_sender.clone(); - move |()| Message::SendBatch { + move |signal| Message::SendBatch { send_all_before: std::u64::MAX, emit_sender: emit_sender.clone(), + flush_signal: signal, } }); @@ -200,6 +202,7 @@ pub fn new(mut config: Config) -> (RecorderHandle, impl Future) { .accept(Message::SendBatch { send_all_before: std::u64::MAX, emit_sender, + flush_signal: None, }) .await; }; @@ -232,13 +235,13 @@ where } async fn mk_emitter( - mut emit_receiver: mpsc::Receiver>, + mut emit_receiver: mpsc::Receiver<(Vec, Option>)>, cloudwatch_client: Box, cloudwatch_namespace: String, ) { let cloudwatch_client = &cloudwatch_client; let cloudwatch_namespace = &cloudwatch_namespace; - while let Some(metrics) = emit_receiver.recv().await { + while let Some((metrics, flush_signal)) = emit_receiver.recv().await { let chunks: Vec<_> = metrics_chunks(&metrics).collect(); stream::iter(chunks) .for_each(|metric_data| async move { @@ -261,11 +264,17 @@ async fn mk_emitter( e, ), Err(tokio::time::error::Elapsed { .. }) => { - log::warn!("Failed to send metrics: send timeout") + log::warn!("Failed to send metrics: send timeout"); } } }) .await; + + if let Some(signal) = flush_signal { + if let Err(_) = signal.send(()) { + log::warn!("Unable to send flush complete signal"); + } + } } } @@ -353,7 +362,7 @@ fn jitter_interval_at( } fn mk_send_batch_timer( - emit_sender: mpsc::Sender>, + emit_sender: mpsc::Sender<(Vec, Option>)>, config: &Config, ) -> impl Stream { let interval = Duration::from_secs(config.send_interval_secs); @@ -363,6 +372,7 @@ fn mk_send_batch_timer( Message::SendBatch { send_all_before, emit_sender: emit_sender.clone(), + flush_signal: None, } }) } @@ -476,8 +486,9 @@ impl Collector { Message::SendBatch { send_all_before, emit_sender, + flush_signal, } => { - self.accept_send_batch(send_all_before, emit_sender)?; + self.accept_send_batch(send_all_before, emit_sender, flush_signal)?; break; } }, @@ -493,7 +504,8 @@ impl Collector { Message::SendBatch { send_all_before, emit_sender, - } => self.accept_send_batch(send_all_before, emit_sender), + flush_signal, + } => self.accept_send_batch(send_all_before, emit_sender, flush_signal), } }; if let Err(e) = result.await { @@ -550,12 +562,13 @@ impl Collector { fn accept_send_batch( &mut self, send_all_before: Timestamp, - emit_sender: mpsc::Sender>, + emit_sender: mpsc::Sender<(Vec, Option>)>, + flush_signal: Option>, ) -> Result<(), Box> { let mut range = self.metrics_data.split_off(&send_all_before); mem::swap(&mut range, &mut self.metrics_data); - let mut metrics_batch = vec![]; + let mut metrics_batch = (vec![], flush_signal); for (timestamp, stats_by_key) in range { let timestamp = timestamp_string(time::UNIX_EPOCH + Duration::from_secs(timestamp)); @@ -609,7 +622,7 @@ impl Collector { maximum: sum, minimum: sum, }; - metrics_batch.push(stats_set_datum(stats_set, unit.or(Some("Count")))); + metrics_batch.0.push(stats_set_datum(stats_set, unit.or(Some("Count")))); } if let Some(Gauge { @@ -625,7 +638,7 @@ impl Collector { minimum, maximum, }; - metrics_batch.push(stats_set_datum(statistic_set, unit)); + metrics_batch.0.push(stats_set_datum(statistic_set, unit)); } let histogram_datum = &mut |Histogram { values, counts }, unit| MetricDatum { @@ -656,12 +669,12 @@ impl Collector { if histogram.values.is_empty() { break; }; - metrics_batch.push(histogram_datum(histogram, unit.map(|s| s.into()))); + metrics_batch.0.push(histogram_datum(histogram, unit.map(|s| s.into()))); } } } } - if !metrics_batch.is_empty() { + if !metrics_batch.0.is_empty() { emit_sender.try_send(metrics_batch)?; } Ok(()) diff --git a/tests/test.rs b/tests/flush_on_shutdown.rs similarity index 100% rename from tests/test.rs rename to tests/flush_on_shutdown.rs diff --git a/tests/flush_signal.rs b/tests/flush_signal.rs new file mode 100644 index 0000000..9707d25 --- /dev/null +++ b/tests/flush_signal.rs @@ -0,0 +1,59 @@ +use std::time::Duration; + +use futures_util::FutureExt; + +use common::MockCloudWatchClient; + +mod common; + +#[tokio::test] +async fn test_manual_flush() { + let _ = env_logger::try_init(); + + let client = MockCloudWatchClient::default(); + + tokio::time::pause(); + let (tx, rx) = tokio::sync::oneshot::channel(); + + let (metrics_force_flush_sender, metrics_force_flush_receiver) = + tokio::sync::mpsc::channel::>>(1); + let metrics_force_flush_receiver = + tokio_stream::wrappers::ReceiverStream::from(metrics_force_flush_receiver); + + let backend_fut = Box::pin( + metrics_cloudwatch::Builder::new_with(client.clone()) + .cloudwatch_namespace("test-ns") + .default_dimension("dimension", "default") + .send_interval_secs(100) + .storage_resolution(metrics_cloudwatch::Resolution::Second) + .shutdown_signal(Box::pin(rx.map(|_| ()))) + .force_flush_stream(Box::pin(metrics_force_flush_receiver)) + .init_future(metrics::set_boxed_recorder), + ); + + let join_handle = tokio::spawn(backend_fut); + tokio::time::advance(Duration::from_millis(5)).await; + + for i in 0..150 { + metrics::histogram!("test", i as f64); + metrics::counter!("count", 1); + metrics::gauge!("gauge", i as f64); + } + + { + let (tx, rx) = tokio::sync::oneshot::channel(); + + // Send flush with flush signal: + metrics_force_flush_sender.send(Some(tx)).await.unwrap(); + + // Wait for a flush signal: + match rx.await { + Ok(()) => {}, + _ => panic!("Expected a flush signal"), + } + + } + + tx.send(()).unwrap(); + join_handle.await.unwrap().unwrap(); +}