diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 729f343a55..f48d2fb1b9 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -158,7 +158,17 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope `experimental_logs_batch_log_processor_with_async_runtime`. - Continue enabling one of the async runtime feature flags: `rt-tokio`, `rt-tokio-current-thread`, or `rt-async-std`. - + +- Added Two new methods to the LogRecord struct's public API: +```rust + update_attribute(&Key, &AnyValue) -> Option +``` + - Updates the value of the first occurrence of an attribute with the specified key. + - If the key exists, the old value is returned. If not, the new key-value pair is added, and None is returned. +```rust +remove_attribute(&mut self, key: &Key) -> usize +``` +- Removes all occurrences of attributes with the specified key and returns the count of deleted attributes. ## 0.27.1 Released 2024-Nov-27 @@ -275,6 +285,7 @@ Released 2024-Sep-30 - *Breaking* - Remove support for `MetricProducer` which allowed metrics from external sources to be sent through OpenTelemetry. [#2105](https://github.com/open-telemetry/opentelemetry-rust/pull/2105) + - Feature: `SimpleSpanProcessor::new` is now public [#2119](https://github.com/open-telemetry/opentelemetry-rust/pull/2119) - For Delta Temporality, exporters are not invoked unless there were new measurements since the last collect/export. diff --git a/opentelemetry-sdk/src/growable_array.rs b/opentelemetry-sdk/src/growable_array.rs index f174bedea2..433b923e75 100644 --- a/opentelemetry-sdk/src/growable_array.rs +++ b/opentelemetry-sdk/src/growable_array.rs @@ -87,6 +87,71 @@ impl< self.count + self.overflow.as_ref().map_or(0, Vec::len) } + /// Removes the element at a specific position (index) while preserving the order. + /// + /// This function performs the following operations: + /// + /// - If the index points to an element in the internal array (`inline`): + /// - Removes the element at the specified index. + /// - Shifts the remaining elements in the internal array to the left to fill the gap, preserving the order. + /// - If an overflow vector exists: + /// - Moves the first element from the overflow vector into the last position of the internal array, if available. + /// - If the index points to an element in the overflow vector: + /// - The element is removed directly from the overflow vector. + /// + /// # Arguments + /// + /// - `index`: The index of the element to be deleted. + /// + /// # Returns + /// + /// - `Some(T)`: The deleted element, if found. + /// - `None`: If the index is out of bounds for both the internal array and the overflow vector. + /// + #[allow(dead_code)] + pub(crate) fn remove_at(&mut self, index: usize) -> Option { + if index < self.count { + let removed_value = self.inline[index].clone(); + + // Shift elements in inline array to the left + for i in index..self.count - 1 { + self.inline[i] = self.inline[i + 1].clone(); + } + + // Handle moving an overflow element to inline, if available + let moved_from_overflow = if let Some(ref mut overflow) = self.overflow { + if let Some(first_from_overflow) = overflow.first().cloned() { + self.inline[self.count - 1] = first_from_overflow; + overflow.remove(0); // Remove the first element from overflow + true + } else { + self.inline[self.count - 1] = Default::default(); + false + } + } else { + self.inline[self.count - 1] = Default::default(); + false + }; + + // Only decrement count if no item was moved from overflow + if !moved_from_overflow { + self.count -= 1; + } + return Some(removed_value); + } + + // Handle removing from the overflow vector + if let Some(ref mut overflow) = self.overflow { + let overflow_index = index - MAX_INLINE_CAPACITY; + if overflow_index < overflow.len() { + return Some(overflow.remove(overflow_index)); + } + } + + // Index out of bounds + None + } + /// Returns an iterator over the elements in the `GrowableArray`. /// /// The iterator yields elements from the internal array (`initial`) first, followed by elements @@ -106,6 +171,26 @@ impl< .chain(self.overflow.as_ref().unwrap().iter()) } } + + /// Returns a mutable iterator over the elements in the `GrowableArray`. + /// + /// The iterator yields elements from the internal array (`initial`) first, followed by elements + /// from the vector (`overflow`) if present. This allows for efficient iteration over both + /// stack-allocated and heap-allocated portions. + /// + #[allow(dead_code)] + #[inline] + pub(crate) fn iter_mut(&mut self) -> impl Iterator { + if self.overflow.is_none() || self.overflow.as_ref().unwrap().is_empty() { + self.inline.iter_mut().take(self.count).chain([].iter_mut()) // Chaining with an empty array + // so that both `if` and `else` branch return the same type + } else { + self.inline + .iter_mut() + .take(self.count) + .chain(self.overflow.as_mut().unwrap().iter_mut()) + } + } } // Implement `IntoIterator` for `GrowableArray` @@ -371,4 +456,128 @@ mod tests { } assert_eq!(iter.next(), None); } + + #[test] + fn test_mut_iter_all_cases() { + let mut collection = GrowableArray::::new(); + + // Case 1: Try to modify values in an empty list + for value in collection.iter_mut() { + *value *= 2; // This should not be executed + } + assert_eq!(collection.len(), 0); + assert_eq!(collection.get(0), None); + + // Case 2: Add a single element and modify it + collection.push(5); + for value in collection.iter_mut() { + *value *= 2; + } + assert_eq!(collection.get(0), Some(&10)); + assert_eq!(collection.len(), 1); + + // Case 3: Add more elements and modify them + for i in 1..10 { + collection.push(i); + } + for (i, value) in collection.iter_mut().enumerate() { + *value = i as i32 * 3; // Set values to i * 3 + } + for i in 0..10 { + assert_eq!(collection.get(i), Some(&(i as i32 * 3))); + } + } + #[test] + fn test_remove_at_inline() { + let mut collection = GrowableArray::::new(); + for i in 0..DEFAULT_MAX_INLINE_CAPACITY { + collection.push(i as i32); + } + assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY); + + // Remove a value from the inline array using remove_at + let removed = collection.remove_at(3); + assert_eq!(removed, Some(3)); + assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY - 1); + + // Ensure the array shifted correctly and the value was removed + for i in 0..3 { + assert_eq!(collection.get(i), Some(&(i as i32))); + } + for i in 3..collection.len() { + assert_eq!(collection.get(i), Some(&((i + 1) as i32))); + } + + // Try to remove a value out of bounds + let non_existent = collection.remove_at(99); + assert_eq!(non_existent, None); + } + + #[test] + fn test_remove_at_overflow() { + let mut collection = GrowableArray::::new(); + // Fill inline array + for i in 0..DEFAULT_MAX_INLINE_CAPACITY { + collection.push(i as i32); + } + // Add elements to the overflow + for i in DEFAULT_MAX_INLINE_CAPACITY..(DEFAULT_MAX_INLINE_CAPACITY + 5) { + collection.push(i as i32); + } + assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 5); + + // Remove a value from the overflow vector using remove_at + let removed = collection.remove_at(DEFAULT_MAX_INLINE_CAPACITY + 2); + assert_eq!(removed, Some(12)); + assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 4); + + // Ensure the rest of the elements are in order + for i in 0..DEFAULT_MAX_INLINE_CAPACITY { + assert_eq!(collection.get(i), Some(&(i as i32))); + } + assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY), Some(&10)); + assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY + 1), Some(&11)); + assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY + 2), Some(&13)); + } + + #[test] + fn test_remove_at_last_element() { + let mut collection = GrowableArray::::new(); + collection.push(10); + assert_eq!(collection.len(), 1); + + // Remove the only element in the collection using remove_at + let removed = collection.remove_at(0); + assert_eq!(removed, Some(10)); + assert_eq!(collection.len(), 0); + + // Ensure it's empty + assert_eq!(collection.get(0), None); + } + + #[test] + fn test_remove_at_from_inline_and_replace_with_overflow() { + let mut collection = GrowableArray::::new(); + + // Fill inline array + for i in 0..DEFAULT_MAX_INLINE_CAPACITY { + collection.push(i as i32); + } + + // Add overflow elements + for i in DEFAULT_MAX_INLINE_CAPACITY..(DEFAULT_MAX_INLINE_CAPACITY + 3) { + collection.push(i as i32); + } + + // Before removing, ensure that the count is correct + assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 3); + + // Remove an inline value and ensure that an overflow value takes its place using remove_at + let removed = collection.remove_at(5); + assert_eq!(removed, Some(5)); + assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY + 2); + + // The last inline position should now be filled with the first overflow element + assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY - 1), Some(&10)); + } } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 47d410c381..836fdd2849 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1258,6 +1258,11 @@ mod tests { Key::from_static_str("processed_by"), AnyValue::String("FirstProcessor".into()), ); + record.add_attribute( + Key::from_static_str("key1"), + AnyValue::String("val1".into()), + ); + // update body record.body = Some("Updated by FirstProcessor".into()); @@ -1287,6 +1292,51 @@ mod tests { &Key::from_static_str("processed_by"), &AnyValue::String("FirstProcessor".into()) )); + record.update_attribute( + &Key::from_static_str("processed_by"), + AnyValue::from("SecondProcessor"), + ); + let _ = record.remove_attribute(&Key::from_static_str("key1")); + assert!( + record.body.clone().unwrap() + == AnyValue::String("Updated by FirstProcessor".into()) + ); + self.logs + .lock() + .unwrap() + .push((record.clone(), instrumentation.clone())); + } + + fn force_flush(&self) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&self) -> LogResult<()> { + Ok(()) + } + } + + #[derive(Debug)] + struct ThirdProcessor { + pub(crate) logs: Arc>>, + } + + impl LogProcessor for ThirdProcessor { + fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { + assert!(record.attributes_contains( + &Key::from_static_str("processed_by"), + &AnyValue::String("SecondProcessor".into()) + )); + record.update_attribute( + &Key::from_static_str("processed_by"), + AnyValue::from("ThirdProcessor"), + ); + assert!(!record.attributes_contains( + &Key::from_static_str("key1"), + &AnyValue::String("value1".into()) + )); + + let _ = record.remove_attribute(&Key::from_static_str("key1")); assert!( record.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) @@ -1305,10 +1355,12 @@ mod tests { Ok(()) } } + #[test] fn test_log_data_modification_by_multiple_processors() { let first_processor_logs = Arc::new(Mutex::new(Vec::new())); let second_processor_logs = Arc::new(Mutex::new(Vec::new())); + let third_processor_logs = Arc::new(Mutex::new(Vec::new())); let first_processor = FirstProcessor { logs: Arc::clone(&first_processor_logs), @@ -1316,10 +1368,14 @@ mod tests { let second_processor = SecondProcessor { logs: Arc::clone(&second_processor_logs), }; + let third_processor = ThirdProcessor { + logs: Arc::clone(&third_processor_logs), + }; let logger_provider = LoggerProvider::builder() .with_log_processor(first_processor) .with_log_processor(second_processor) + .with_log_processor(third_processor) .build(); let logger = logger_provider.logger("test-logger"); @@ -1330,9 +1386,11 @@ mod tests { assert_eq!(first_processor_logs.lock().unwrap().len(), 1); assert_eq!(second_processor_logs.lock().unwrap().len(), 1); + assert_eq!(third_processor_logs.lock().unwrap().len(), 1); let first_log = &first_processor_logs.lock().unwrap()[0]; let second_log = &second_processor_logs.lock().unwrap()[0]; + let third_log = &third_processor_logs.lock().unwrap()[0]; assert!(first_log.0.attributes_contains( &Key::from_static_str("processed_by"), @@ -1340,7 +1398,11 @@ mod tests { )); assert!(second_log.0.attributes_contains( &Key::from_static_str("processed_by"), - &AnyValue::String("FirstProcessor".into()) + &AnyValue::String("SecondProcessor".into()) + )); + assert!(third_log.0.attributes_contains( + &Key::from_static_str("processed_by"), + &AnyValue::String("ThirdProcessor".into()) )); assert!( @@ -1351,6 +1413,10 @@ mod tests { second_log.0.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); + assert!( + third_log.0.body.clone().unwrap() + == AnyValue::String("Updated by FirstProcessor".into()) + ); } #[test] diff --git a/opentelemetry-sdk/src/logs/record.rs b/opentelemetry-sdk/src/logs/record.rs index 3e4f5c8c18..3524f977cd 100644 --- a/opentelemetry-sdk/src/logs/record.rs +++ b/opentelemetry-sdk/src/logs/record.rs @@ -1,4 +1,5 @@ use crate::growable_array::GrowableArray; +use opentelemetry::logs::LogRecord as _; use opentelemetry::{ logs::{AnyValue, Severity}, trace::{SpanContext, SpanId, TraceFlags, TraceId}, @@ -186,6 +187,73 @@ impl LogRecord { .flatten() .any(|(k, v)| k == key && v == value) } + + /// Updates the first occurrence of an attribute with the specified key. + /// + /// This method searches for the first occurrence of the attribute with the given key + /// in the `attributes` collection. If the key is found, its value is updated with the + /// provided value. If the key is not found, the attribute is added. + /// + /// # Arguments + /// + /// - `key`: A reference to the key of the attribute to update. + /// - `value`: A new value for the attribute. + /// + /// # Returns + /// + /// - `Some(AnyValue)`: The old value of the attribute if found and updated. + /// - `None`: If the attribute was not found, and a new one was added. + /// + pub fn update_attribute(&mut self, key: &Key, value: AnyValue) -> Option { + // First, search for the attribute mutably + if let Some(attr) = self + .attributes + .iter_mut() + .find(|opt| opt.as_ref().map(|(k, _)| k == key).unwrap_or(false)) + { + // Take the old value and update the attribute + let old_value = attr.take().map(|(_, v)| v); + *attr = Some((key.clone(), value)); + return old_value; + } + + // If not found, add a new attribute + self.add_attribute(key.clone(), value.clone()); + None + } + + /// Removes all occurrences of an attribute with the specified key. + /// + /// This method searches for all occurrences of the attribute with the given key + /// in the `attributes` collection and removes them. + /// + /// # Arguments + /// + /// - `key`: A reference to the key of the attribute to remove. + /// + /// # Returns + /// + /// - The number of removed occurrences of the key. + /// + pub fn remove_attribute(&mut self, key: &Key) -> usize { + let mut deleted_count = 0; + + // Loop to find and remove all occurrences + while let Some(index) = { + // Isolate the immutable borrow in a block scope + let position = self + .attributes + .iter() + .position(|opt| opt.as_ref().map(|(k, _)| k == key).unwrap_or(false)); + position + } { + // Now proceed with the mutable borrow and remove the item + self.attributes.remove_at(index); + deleted_count += 1; + } + + deleted_count + } } /// TraceContext stores the trace context for logs that have an associated @@ -214,7 +282,7 @@ impl From<&SpanContext> for TraceContext { #[cfg(all(test, feature = "testing"))] mod tests { use super::*; - use opentelemetry::logs::{AnyValue, LogRecord as _, Severity}; + use opentelemetry::logs::{AnyValue, Severity}; use std::borrow::Cow; use std::time::SystemTime; @@ -354,4 +422,41 @@ mod tests { assert_eq!(log_record_borrowed, log_record_owned); } + + #[test] + fn test_update_attribute() { + let mut log_record = LogRecord::default(); + let key = Key::new("key1"); + let value = AnyValue::String("value1".into()); + let updated_value = AnyValue::String("updated_value".into()); + + // Add a new attribute + assert!(log_record.update_attribute(&key, value.clone()).is_none()); + assert!(log_record.attributes_contains(&key, &value)); + + // Update the existing attribute + assert_eq!( + log_record.update_attribute(&key, updated_value.clone()), + Some(value) + ); + assert!(log_record.attributes_contains(&key, &updated_value)); + } + + #[test] + fn test_delete_attribute() { + let mut log_record = LogRecord::default(); + let key = Key::new("key1"); + let value = AnyValue::String("value1".into()); + + // Add an attribute + log_record.add_attribute(key.clone(), value.clone()); + assert!(log_record.attributes_contains(&key, &value)); + + // Delete the attribute + let del_count = log_record.remove_attribute(&key); + assert_eq!(del_count, 1); + + // Ensure it is deleted + assert!(!log_record.attributes_contains(&key, &value)); + } }