Skip to content

Commit

Permalink
Merge pull request #5 from opencomputeproject/fix/atomics
Browse files Browse the repository at this point in the history
fix atomics usage; ordering
  • Loading branch information
mimir-d authored Oct 8, 2024
2 parents aa07128 + ab483c5 commit 9e7d4f0
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 213 deletions.
15 changes: 7 additions & 8 deletions src/output/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::clone::Clone;
use std::io;
use std::io::Write;
use std::path::Path;
use std::sync::atomic;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;

use tokio::fs::File;
Expand Down Expand Up @@ -104,14 +104,13 @@ impl JsonEmitter {
let root = spec::Root {
artifact: object.clone(),
timestamp: now_tz,
seqno: self.next_sequence_no(),
seqno: self.incr_seqno(),
};
serde_json::json!(root)
}

fn next_sequence_no(&self) -> u64 {
self.seqno.fetch_add(1, atomic::Ordering::SeqCst);
self.seqno.load(atomic::Ordering::SeqCst)
fn incr_seqno(&self) -> u64 {
self.seqno.fetch_add(1, Ordering::AcqRel)
}

pub async fn emit(&self, object: &spec::RootImpl) -> Result<(), WriterError> {
Expand Down Expand Up @@ -140,7 +139,7 @@ mod tests {
"major": spec::SPEC_VERSION.0,
"minor": spec::SPEC_VERSION.1,
},
"sequenceNumber": 1
"sequenceNumber": 0
});

let buffer = Arc::new(Mutex::new(vec![]));
Expand Down Expand Up @@ -168,14 +167,14 @@ mod tests {
"major": spec::SPEC_VERSION.0,
"minor": spec::SPEC_VERSION.1,
},
"sequenceNumber": 1
"sequenceNumber": 0
});
let expected_2 = json!({
"schemaVersion": {
"major": spec::SPEC_VERSION.0,
"minor": spec::SPEC_VERSION.1,
},
"sequenceNumber": 2
"sequenceNumber": 1
});

let buffer = Arc::new(Mutex::new(vec![]));
Expand Down
28 changes: 9 additions & 19 deletions src/output/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
// https://opensource.org/licenses/MIT.

use std::future::Future;
use std::sync::atomic;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;

use serde_json::Map;
use serde_json::Value;
use tokio::sync::Mutex;

use crate::output as tv;
use crate::spec;
Expand All @@ -23,15 +22,15 @@ use tv::{dut, emitter, step};
pub struct MeasurementSeries {
emitter: Arc<step::StepEmitter>,

seq_no: Arc<Mutex<atomic::AtomicU64>>,
seq_no: Arc<atomic::AtomicU64>,
start: MeasurementSeriesStart,
}

impl MeasurementSeries {
pub(crate) fn new(series_id: &str, name: &str, emitter: Arc<step::StepEmitter>) -> Self {
Self {
emitter,
seq_no: Arc::new(Mutex::new(atomic::AtomicU64::new(0))),
seq_no: Arc::new(atomic::AtomicU64::new(0)),
start: MeasurementSeriesStart::new(name, series_id),
}
}
Expand All @@ -42,20 +41,13 @@ impl MeasurementSeries {
) -> Self {
Self {
emitter,
seq_no: Arc::new(Mutex::new(atomic::AtomicU64::new(0))),
seq_no: Arc::new(atomic::AtomicU64::new(0)),
start,
}
}

async fn current_sequence_no(&self) -> u64 {
self.seq_no.lock().await.load(atomic::Ordering::SeqCst)
}

async fn increment_sequence_no(&self) {
self.seq_no
.lock()
.await
.fetch_add(1, atomic::Ordering::SeqCst);
fn incr_seqno(&self) -> u64 {
self.seq_no.fetch_add(1, Ordering::AcqRel)
}

/// Starts the measurement series.
Expand Down Expand Up @@ -109,7 +101,7 @@ impl MeasurementSeries {
pub async fn end(&self) -> Result<(), emitter::WriterError> {
let end = spec::MeasurementSeriesEnd {
series_id: self.start.series_id.clone(),
total_count: self.current_sequence_no().await,
total_count: self.seq_no.load(Ordering::Acquire),
};

self.emitter
Expand Down Expand Up @@ -141,13 +133,12 @@ impl MeasurementSeries {
/// ```
pub async fn add_measurement(&self, value: Value) -> Result<(), emitter::WriterError> {
let element = spec::MeasurementSeriesElement {
index: self.current_sequence_no().await,
index: self.incr_seqno(),
value: value.clone(),
timestamp: chrono::Local::now().with_timezone(&chrono_tz::Tz::UTC),
series_id: self.start.series_id.clone(),
metadata: None,
};
self.increment_sequence_no().await;

self.emitter
.emit(&spec::TestStepArtifactImpl::MeasurementSeriesElement(
Expand Down Expand Up @@ -185,15 +176,14 @@ impl MeasurementSeries {
metadata: Vec<(&str, Value)>,
) -> Result<(), emitter::WriterError> {
let element = spec::MeasurementSeriesElement {
index: self.current_sequence_no().await,
index: self.incr_seqno(),
value: value.clone(),
timestamp: chrono::Local::now().with_timezone(&chrono_tz::Tz::UTC),
series_id: self.start.series_id.clone(),
metadata: Some(Map::from_iter(
metadata.iter().map(|(k, v)| (k.to_string(), v.clone())),
)),
};
self.increment_sequence_no().await;

self.emitter
.emit(&spec::TestStepArtifactImpl::MeasurementSeriesElement(
Expand Down
10 changes: 4 additions & 6 deletions src/output/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// https://opensource.org/licenses/MIT.

use serde_json::Value;
use std::sync::atomic;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;

use crate::output as tv;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl TestStep {

Ok(StartedTestStep {
step: self,
measurement_id_no: Arc::new(atomic::AtomicU64::new(0)),
measurement_id_seqno: Arc::new(atomic::AtomicU64::new(0)),
})
}

Expand Down Expand Up @@ -107,7 +107,7 @@ impl TestStep {

pub struct StartedTestStep {
step: TestStep,
measurement_id_no: Arc<atomic::AtomicU64>,
measurement_id_seqno: Arc<atomic::AtomicU64>,
}

impl StartedTestStep {
Expand Down Expand Up @@ -464,11 +464,9 @@ impl StartedTestStep {
/// # });
/// ```
pub fn measurement_series(&self, name: &str) -> MeasurementSeries {
self.measurement_id_no
.fetch_add(1, atomic::Ordering::SeqCst);
let series_id: String = format!(
"series_{}",
self.measurement_id_no.load(atomic::Ordering::SeqCst)
self.measurement_id_seqno.fetch_add(1, Ordering::AcqRel)
);

MeasurementSeries::new(&series_id, name, Arc::clone(&self.step.emitter))
Expand Down
28 changes: 14 additions & 14 deletions tests/output/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn test_ocptv_error_macro_with_symptom_and_message() -> Result<()> {
"symptom": "symptom"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "error", |run| async move {
Expand All @@ -127,7 +127,7 @@ async fn test_ocptv_error_macro_with_symptom() -> Result<()> {
"symptom": "symptom"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "error", |run| async move {
Expand All @@ -146,7 +146,7 @@ async fn test_ocptv_log_debug() -> Result<()> {
"severity": "DEBUG"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -166,7 +166,7 @@ async fn test_ocptv_log_info() -> Result<()> {
"severity": "INFO"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -185,7 +185,7 @@ async fn test_ocptv_log_warning() -> Result<()> {
"severity": "WARNING"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -204,7 +204,7 @@ async fn test_ocptv_log_error() -> Result<()> {
"severity": "ERROR"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -223,7 +223,7 @@ async fn test_ocptv_log_fatal() -> Result<()> {
"severity": "FATAL"
}
},
"sequenceNumber": 3
"sequenceNumber": 2
});

check_output_run(&expected, "log", |run| async move {
Expand All @@ -242,7 +242,7 @@ async fn test_ocptv_error_macro_with_symptom_and_message_in_step() -> Result<()>
"symptom":"symptom"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "error", |step| async move {
Expand All @@ -260,7 +260,7 @@ async fn test_ocptv_error_macro_with_symptom_in_step() -> Result<()> {
"symptom": "symptom"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "error", |step| async move {
Expand All @@ -279,7 +279,7 @@ async fn test_ocptv_log_debug_in_step() -> Result<()> {
"severity": "DEBUG"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -298,7 +298,7 @@ async fn test_ocptv_log_info_in_step() -> Result<()> {
"severity": "INFO"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -317,7 +317,7 @@ async fn test_ocptv_log_warning_in_step() -> Result<()> {
"severity":"WARNING"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -336,7 +336,7 @@ async fn test_ocptv_log_error_in_step() -> Result<()> {
"severity": "ERROR"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand All @@ -355,7 +355,7 @@ async fn test_ocptv_log_fatal_in_step() -> Result<()> {
"severity": "FATAL"
}
},
"sequenceNumber": 4
"sequenceNumber": 3
});

check_output_step(&expected, "log", |step| async move {
Expand Down
Loading

0 comments on commit 9e7d4f0

Please sign in to comment.