Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to modify/remove an existing entry from LogRecord attributes #2103

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@
- Update `async-std` dependency version to 1.13
- *Breaking* - Remove support for `MetricProducer` which allowed metrics from
external sources to be sent through OpenTelemetry.
[#2105](https://github.com/open-telemetry/opentelemetry-rust/pull/2105)
[#2105](https://github.com/open-telemetry/opentelemetry-rust/pull/2105)
- Added Two new methods to the LogRecord struct's public API:
```rust
update_attribute(&Key, &AnyValue) -> Option<AnyValue>
```
- Updates the value of the first occurrence of an attribute with the specified key.
- If the key exists, the old value is returned. If not, the new key-value pair is added, and None is returned.
```rust
remove_attribute(&mut self, key: &Key) -> usize
```
- Removes all occurrences of attributes with the specified key and returns the count of deleted attributes.

## v0.25.0

Expand Down
209 changes: 209 additions & 0 deletions opentelemetry-sdk/src/growable_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,71 @@
self.count + self.overflow.as_ref().map_or(0, Vec::len)
}

/// Removes the element at a specific position (index) while preserving the order.
///
/// This function performs the following operations:
///
/// - If the index points to an element in the internal array (`inline`):
/// - Removes the element at the specified index.
/// - Shifts the remaining elements in the internal array to the left to fill the gap, preserving the order.
/// - If an overflow vector exists:
/// - Moves the first element from the overflow vector into the last position of the internal array, if available.
/// - If the index points to an element in the overflow vector:
/// - The element is removed directly from the overflow vector.
///
/// # Arguments
///
/// - `index`: The index of the element to be deleted.
///
/// # Returns
///
/// - `Some(T)`: The deleted element, if found.
/// - `None`: If the index is out of bounds for both the internal array and the overflow vector.
///
#[allow(dead_code)]
pub(crate) fn remove_at(&mut self, index: usize) -> Option<T> {
if index < self.count {
let removed_value = self.inline[index].clone();

// Shift elements in inline array to the left
for i in index..self.count - 1 {
self.inline[i] = self.inline[i + 1].clone();
}

// Handle moving an overflow element to inline, if available
let moved_from_overflow = if let Some(ref mut overflow) = self.overflow {
if let Some(first_from_overflow) = overflow.first().cloned() {
self.inline[self.count - 1] = first_from_overflow;
overflow.remove(0); // Remove the first element from overflow
true
} else {
self.inline[self.count - 1] = Default::default();
false

Check warning on line 129 in opentelemetry-sdk/src/growable_array.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/growable_array.rs#L128-L129

Added lines #L128 - L129 were not covered by tests
}
} else {
self.inline[self.count - 1] = Default::default();
false
};

// Only decrement count if no item was moved from overflow
if !moved_from_overflow {
self.count -= 1;
}
return Some(removed_value);
}

// Handle removing from the overflow vector
if let Some(ref mut overflow) = self.overflow {
let overflow_index = index - MAX_INLINE_CAPACITY;
if overflow_index < overflow.len() {
return Some(overflow.remove(overflow_index));
}

Check warning on line 148 in opentelemetry-sdk/src/growable_array.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/growable_array.rs#L148

Added line #L148 was not covered by tests
}

// Index out of bounds
None
}

/// Returns an iterator over the elements in the `GrowableArray`.
///
/// The iterator yields elements from the internal array (`initial`) first, followed by elements
Expand All @@ -106,6 +171,26 @@
.chain(self.overflow.as_ref().unwrap().iter())
}
}

/// Returns a mutable iterator over the elements in the `GrowableArray`.
///
/// The iterator yields elements from the internal array (`initial`) first, followed by elements
/// from the vector (`overflow`) if present. This allows for efficient iteration over both
/// stack-allocated and heap-allocated portions.
///
#[allow(dead_code)]
#[inline]
pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
if self.overflow.is_none() || self.overflow.as_ref().unwrap().is_empty() {
self.inline.iter_mut().take(self.count).chain([].iter_mut()) // Chaining with an empty array
// so that both `if` and `else` branch return the same type
} else {
self.inline
.iter_mut()
.take(self.count)
.chain(self.overflow.as_mut().unwrap().iter_mut())

Check warning on line 191 in opentelemetry-sdk/src/growable_array.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/growable_array.rs#L188-L191

Added lines #L188 - L191 were not covered by tests
}
}
}

// Implement `IntoIterator` for `GrowableArray`
Expand Down Expand Up @@ -371,4 +456,128 @@
}
assert_eq!(iter.next(), None);
}

#[test]
fn test_mut_iter_all_cases() {
let mut collection = GrowableArray::<i32>::new();

// Case 1: Try to modify values in an empty list
for value in collection.iter_mut() {
*value *= 2; // This should not be executed
}

Check warning on line 467 in opentelemetry-sdk/src/growable_array.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/growable_array.rs#L466-L467

Added lines #L466 - L467 were not covered by tests
assert_eq!(collection.len(), 0);
assert_eq!(collection.get(0), None);

// Case 2: Add a single element and modify it
collection.push(5);
for value in collection.iter_mut() {
*value *= 2;
}
assert_eq!(collection.get(0), Some(&10));
assert_eq!(collection.len(), 1);

// Case 3: Add more elements and modify them
for i in 1..10 {
collection.push(i);
}
for (i, value) in collection.iter_mut().enumerate() {
*value = i as i32 * 3; // Set values to i * 3
}
for i in 0..10 {
assert_eq!(collection.get(i), Some(&(i as i32 * 3)));
}
}
#[test]
fn test_remove_at_inline() {
let mut collection = GrowableArray::<i32>::new();
for i in 0..DEFAULT_MAX_INLINE_CAPACITY {
collection.push(i as i32);
}
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY);

// Remove a value from the inline array using remove_at
let removed = collection.remove_at(3);
assert_eq!(removed, Some(3));
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY - 1);

// Ensure the array shifted correctly and the value was removed
for i in 0..3 {
assert_eq!(collection.get(i), Some(&(i as i32)));
}
for i in 3..collection.len() {
assert_eq!(collection.get(i), Some(&((i + 1) as i32)));
}

// Try to remove a value out of bounds
let non_existent = collection.remove_at(99);
assert_eq!(non_existent, None);
}

#[test]
fn test_remove_at_overflow() {
let mut collection = GrowableArray::<i32>::new();
// Fill inline array
for i in 0..DEFAULT_MAX_INLINE_CAPACITY {
collection.push(i as i32);
}
// Add elements to the overflow
for i in DEFAULT_MAX_INLINE_CAPACITY..(DEFAULT_MAX_INLINE_CAPACITY + 5) {
collection.push(i as i32);
}
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 5);

// Remove a value from the overflow vector using remove_at
let removed = collection.remove_at(DEFAULT_MAX_INLINE_CAPACITY + 2);
assert_eq!(removed, Some(12));
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 4);

// Ensure the rest of the elements are in order
for i in 0..DEFAULT_MAX_INLINE_CAPACITY {
assert_eq!(collection.get(i), Some(&(i as i32)));
}
assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY), Some(&10));
assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY + 1), Some(&11));
assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY + 2), Some(&13));
}

#[test]
fn test_remove_at_last_element() {
let mut collection = GrowableArray::<i32>::new();
collection.push(10);
assert_eq!(collection.len(), 1);

// Remove the only element in the collection using remove_at
let removed = collection.remove_at(0);
assert_eq!(removed, Some(10));
assert_eq!(collection.len(), 0);

// Ensure it's empty
assert_eq!(collection.get(0), None);
}

#[test]
fn test_remove_at_from_inline_and_replace_with_overflow() {
let mut collection = GrowableArray::<i32>::new();

// Fill inline array
for i in 0..DEFAULT_MAX_INLINE_CAPACITY {
collection.push(i as i32);
}

// Add overflow elements
for i in DEFAULT_MAX_INLINE_CAPACITY..(DEFAULT_MAX_INLINE_CAPACITY + 3) {
collection.push(i as i32);
}

// Before removing, ensure that the count is correct
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 3);

// Remove an inline value and ensure that an overflow value takes its place using remove_at
let removed = collection.remove_at(5);
assert_eq!(removed, Some(5));
assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 2);

// The last inline position should now be filled with the first overflow element
assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY - 1), Some(&10));
}
}
68 changes: 67 additions & 1 deletion opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,11 @@
Key::from_static_str("processed_by"),
AnyValue::String("FirstProcessor".into()),
);
record.add_attribute(
Key::from_static_str("key1"),
AnyValue::String("val1".into()),
);

// update body
record.body = Some("Updated by FirstProcessor".into());

Expand Down Expand Up @@ -853,6 +858,51 @@
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
record.update_attribute(
&Key::from_static_str("processed_by"),
AnyValue::from("SecondProcessor"),
);
let _ = record.remove_attribute(&Key::from_static_str("key1"));
assert!(
record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone()));
}

fn force_flush(&self) -> LogResult<()> {
Ok(())
}

Check warning on line 878 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L876-L878

Added lines #L876 - L878 were not covered by tests

fn shutdown(&self) -> LogResult<()> {
Ok(())
}
}

#[derive(Debug)]
struct ThirdProcessor {
pub(crate) logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
}

impl LogProcessor for ThirdProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
assert!(record.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("SecondProcessor".into())
));
record.update_attribute(
&Key::from_static_str("processed_by"),
AnyValue::from("ThirdProcessor"),
);
assert!(!record.attributes_contains(
&Key::from_static_str("key1"),
&AnyValue::String("value1".into())
));

let _ = record.remove_attribute(&Key::from_static_str("key1"));
assert!(
record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
Expand All @@ -871,21 +921,27 @@
Ok(())
}
}

#[test]
fn test_log_data_modification_by_multiple_processors() {
let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
let third_processor_logs = Arc::new(Mutex::new(Vec::new()));

let first_processor = FirstProcessor {
logs: Arc::clone(&first_processor_logs),
};
let second_processor = SecondProcessor {
logs: Arc::clone(&second_processor_logs),
};
let third_processor = ThirdProcessor {
logs: Arc::clone(&third_processor_logs),
};

let logger_provider = LoggerProvider::builder()
.with_log_processor(first_processor)
.with_log_processor(second_processor)
.with_log_processor(third_processor)
.build();

let logger = logger_provider.logger("test-logger");
Expand All @@ -896,17 +952,23 @@

assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
assert_eq!(third_processor_logs.lock().unwrap().len(), 1);

let first_log = &first_processor_logs.lock().unwrap()[0];
let second_log = &second_processor_logs.lock().unwrap()[0];
let third_log = &third_processor_logs.lock().unwrap()[0];

assert!(first_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(second_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
&AnyValue::String("SecondProcessor".into())
));
assert!(third_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("ThirdProcessor".into())
));

assert!(
Expand All @@ -917,5 +979,9 @@
second_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
assert!(
third_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
}
}
Loading
Loading