Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to modify/remove an existing entry from LogRecord attributes #2103

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions opentelemetry-sdk/src/growable_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,72 @@
self.count + self.overflow.as_ref().map_or(0, Vec::len)
}

/// Deletes the element matching the given value from the array while preserving the order.
///
/// This function performs the following operations:
///
/// - Searches the internal array (`inline`) for the specified value.
/// - If the value is found in the internal array:
/// - Removes the value.
/// - Shifts the remaining elements in the array to the left to fill the gap, preserving the order.
/// - If an overflow vector exists:
/// - Moves the first element from the overflow vector into the last position of the internal array.
/// - If the value is not found in the internal array, searches the heap-allocated vector (`overflow`).
/// - If the value is found in the overflow vector, it is removed, and the remaining elements in the vector are shifted left to maintain order.
///
/// # Arguments
///
/// - `value`: A reference to the value to be deleted.
///
/// # Returns
///
/// - `Some(T)`: The deleted value, if found.
/// - `None`: If the value was found in neither the array nor the vector
///
#[allow(dead_code)]
pub(crate) fn delete_item(&mut self, item: &T) -> Option<T> {
// Search and remove from inline array
if let Some(index) = self.inline[..self.count].iter().position(|v| v == item) {
let removed_value = self.inline[index].clone();

// Shift elements to the left to fill the gap
for i in index..self.count - 1 {
self.inline[i] = self.inline[i + 1].clone();
}

// Check if we can move an element from the overflow into the inline array
let moved_from_overflow = if let Some(ref mut overflow) = self.overflow {
if let Some(first_from_overflow) = overflow.first().cloned() {
self.inline[self.count - 1] = first_from_overflow;
overflow.remove(0); // Remove the first element from overflow
true
} else {
self.inline[self.count - 1] = Default::default();
false

Check warning on line 131 in opentelemetry-sdk/src/growable_array.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/growable_array.rs#L130-L131

Added lines #L130 - L131 were not covered by tests
}
} else {
self.inline[self.count - 1] = Default::default();
false
};

// Only decrement count if no item was moved from the overflow
if !moved_from_overflow {
self.count -= 1;
}
return Some(removed_value);
}

// Search and remove from overflow vector
if let Some(ref mut overflow) = self.overflow {
if let Some(index) = overflow.iter().position(|v| v == item) {
return Some(overflow.remove(index));
}

Check warning on line 149 in opentelemetry-sdk/src/growable_array.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/growable_array.rs#L149

Added line #L149 was not covered by tests
}

// Value not found
None
}

/// Returns an iterator over the elements in the `GrowableArray`.
///
/// The iterator yields elements from the internal array (`initial`) first, followed by elements
Expand All @@ -106,6 +172,26 @@
.chain(self.overflow.as_ref().unwrap().iter())
}
}

/// Returns a mutable iterator over the elements in the `GrowableArray`.
///
/// The iterator yields elements from the internal array (`initial`) first, followed by elements
/// from the vector (`overflow`) if present. This allows for efficient iteration over both
/// stack-allocated and heap-allocated portions.
///
#[allow(dead_code)]
#[inline]
pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
if self.overflow.is_none() || self.overflow.as_ref().unwrap().is_empty() {
self.inline.iter_mut().take(self.count).chain([].iter_mut()) // Chaining with an empty array
// so that both `if` and `else` branch return the same type
} else {
self.inline
.iter_mut()
.take(self.count)
.chain(self.overflow.as_mut().unwrap().iter_mut())

Check warning on line 192 in opentelemetry-sdk/src/growable_array.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/growable_array.rs#L189-L192

Added lines #L189 - L192 were not covered by tests
}
}
}

// Implement `IntoIterator` for `GrowableArray`
Expand Down Expand Up @@ -371,4 +457,169 @@
}
assert_eq!(iter.next(), None);
}

#[test]
fn test_mut_iter_all_cases() {
let mut collection = GrowableArray::<i32>::new();

// Case 1: Try to modify values in an empty list
for value in collection.iter_mut() {
*value *= 2; // This should not be executed
}

Check warning on line 468 in opentelemetry-sdk/src/growable_array.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/growable_array.rs#L467-L468

Added lines #L467 - L468 were not covered by tests
assert_eq!(collection.len(), 0);
assert_eq!(collection.get(0), None);

// Case 2: Add a single element and modify it
collection.push(5);
for value in collection.iter_mut() {
*value *= 2;
}
assert_eq!(collection.get(0), Some(&10));
assert_eq!(collection.len(), 1);

// Case 3: Add more elements and modify them
for i in 1..10 {
collection.push(i);
}
for (i, value) in collection.iter_mut().enumerate() {
*value = i as i32 * 3; // Set values to i * 3
}
for i in 0..10 {
assert_eq!(collection.get(i), Some(&(i as i32 * 3)));
}
}
#[test]
fn test_delete_by_value_from_inline() {
let mut collection = GrowableArray::<i32>::new();
for i in 0..DEFAULT_MAX_INLINE_CAPACITY {
collection.push(i as i32);
}
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY);

// Delete a value from the inline array
let removed = collection.delete_item(&3);
assert_eq!(removed, Some(3));
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY - 1);

// Ensure the array shifted correctly and the value was removed
for i in 0..3 {
assert_eq!(collection.get(i), Some(&(i as i32)));
}
for i in 3..collection.len() {
assert_eq!(collection.get(i), Some(&((i + 1) as i32)));
}

// Try to delete a value not in the array
let non_existent = collection.delete_item(&99);
assert_eq!(non_existent, None);
}

#[test]
fn test_delete_by_value_from_overflow() {
let mut collection = GrowableArray::<i32>::new();
// Fill inline array
for i in 0..DEFAULT_MAX_INLINE_CAPACITY {
collection.push(i as i32);
}
// Add elements to the overflow
for i in DEFAULT_MAX_INLINE_CAPACITY..(DEFAULT_MAX_INLINE_CAPACITY + 5) {
collection.push(i as i32);
}
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 5);

// Delete a value from the overflow vector
let removed = collection.delete_item(&12);
assert_eq!(removed, Some(12));
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 4);

// Ensure the rest of the elements are in order
for i in 0..DEFAULT_MAX_INLINE_CAPACITY {
assert_eq!(collection.get(i), Some(&(i as i32)));
}
assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY), Some(&10));
assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY + 1), Some(&11));
assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY + 2), Some(&13));
}

#[test]
fn test_delete_last_element() {
let mut collection = GrowableArray::<i32>::new();
collection.push(10);
assert_eq!(collection.len(), 1);

// Delete the only element in the collection
let removed = collection.delete_item(&10);
assert_eq!(removed, Some(10));
assert_eq!(collection.len(), 0);

// Ensure it's empty
assert_eq!(collection.get(0), None);
}

#[test]
fn test_delete_multiple_values() {
let mut collection = GrowableArray::<i32>::new();
for i in 0..DEFAULT_MAX_INLINE_CAPACITY {
collection.push(i as i32);
}

// Delete multiple values
assert_eq!(collection.delete_item(&2), Some(2));
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY - 1);
assert_eq!(collection.delete_item(&4), Some(4));
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY - 2);

// Ensure the elements are still correct
assert_eq!(collection.get(2), Some(&3));
assert_eq!(collection.get(3), Some(&5));
}

#[test]
fn test_delete_by_value_empty_array() {
let mut collection = GrowableArray::<i32>::new();

// Try to delete from an empty array
let removed = collection.delete_item(&5);
assert_eq!(removed, None);
assert_eq!(collection.len(), 0);
}

#[test]
fn test_delete_by_value_not_in_array() {
let mut collection = GrowableArray::<i32>::new();
collection.push(1);
collection.push(2);
collection.push(3);

// Try to delete a value not present
let removed = collection.delete_item(&10);
assert_eq!(removed, None);
assert_eq!(collection.len(), 3);
}

#[test]
fn test_delete_from_inline_and_replace_with_overflow() {
let mut collection = GrowableArray::<i32>::new();

// Fill inline array
for i in 0..DEFAULT_MAX_INLINE_CAPACITY {
collection.push(i as i32);
} // [0,1,2,3,4,5,6,7,8,9]

// Add overflow elements
for i in DEFAULT_MAX_INLINE_CAPACITY..(DEFAULT_MAX_INLINE_CAPACITY + 3) {
collection.push(i as i32);
} // [0,1,2,3,4,5,6,7,8,9,10,11,12]
// Before delete, ensure that the count is correct
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 3);

// Delete an inline value and ensure that an overflow value takes its place
let removed = collection.delete_item(&5); // Deleting from inline
assert_eq!(removed, Some(5));
// [0,1,2,3,4,6,7,8,9,10,11,12]
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 2);

// The last inline position should now be filled with the first overflow element
assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY - 1), Some(&10));
}
}
68 changes: 67 additions & 1 deletion opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,11 @@
Key::from_static_str("processed_by"),
AnyValue::String("FirstProcessor".into()),
);
record.add_attribute(
Key::from_static_str("key1"),
AnyValue::String("val1".into()),
);

// update body
record.body = Some("Updated by FirstProcessor".into());

Expand Down Expand Up @@ -853,6 +858,51 @@
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
record.update_attribute(
&Key::from_static_str("processed_by"),
&AnyValue::from("SecondProcessor"),
);
let _ = record.delete_attribute(&Key::from_static_str("key1"));
assert!(
record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone()));
}

fn force_flush(&self) -> LogResult<()> {
Ok(())
}

Check warning on line 878 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L876-L878

Added lines #L876 - L878 were not covered by tests

fn shutdown(&self) -> LogResult<()> {
Ok(())
}
}

#[derive(Debug)]
struct ThirdProcessor {
pub(crate) logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
}

impl LogProcessor for ThirdProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
assert!(record.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("SecondProcessor".into())
));
record.update_attribute(
&Key::from_static_str("processed_by"),
&AnyValue::from("ThirdProcessor"),
);
assert!(!record.attributes_contains(
&Key::from_static_str("key1"),
&AnyValue::String("value1".into())
));

let _ = record.delete_attribute(&Key::from_static_str("key1"));
assert!(
record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
Expand All @@ -871,21 +921,27 @@
Ok(())
}
}

#[test]
fn test_log_data_modification_by_multiple_processors() {
let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
let third_processor_logs = Arc::new(Mutex::new(Vec::new()));

let first_processor = FirstProcessor {
logs: Arc::clone(&first_processor_logs),
};
let second_processor = SecondProcessor {
logs: Arc::clone(&second_processor_logs),
};
let third_processor = ThirdProcessor {
logs: Arc::clone(&third_processor_logs),
};

let logger_provider = LoggerProvider::builder()
.with_log_processor(first_processor)
.with_log_processor(second_processor)
.with_log_processor(third_processor)
.build();

let logger = logger_provider.logger("test-logger");
Expand All @@ -896,17 +952,23 @@

assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
assert_eq!(third_processor_logs.lock().unwrap().len(), 1);

let first_log = &first_processor_logs.lock().unwrap()[0];
let second_log = &second_processor_logs.lock().unwrap()[0];
let third_log = &third_processor_logs.lock().unwrap()[0];

assert!(first_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(second_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
&AnyValue::String("SecondProcessor".into())
));
assert!(third_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("ThirdProcessor".into())
));

assert!(
Expand All @@ -917,5 +979,9 @@
second_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
assert!(
third_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
}
}
Loading
Loading