Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nested parsers on MessageProducer level (v3) #2084

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
15 changes: 7 additions & 8 deletions application/apps/indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion application/apps/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ thiserror = "1.0"
lazy_static = "1.4"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
dlt-core = "0.14"
# dlt-core = "0.14"
# TODO https://github.com/esrlabs/dlt-core/pull/24
dlt-core = { git = "https://github.com/kruss/dlt-core.git", branch = "dlt_network_traces" }
crossbeam-channel = "0.5"
futures = "0.3"
tokio-util = "0.7"
Expand Down
12 changes: 9 additions & 3 deletions application/apps/indexer/indexer_cli/src/interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use futures::{pin_mut, stream::StreamExt};
use parsers::{dlt::DltParser, MessageStreamItem, ParseYield};
use processor::grabber::LineRange;
use rustyline::{error::ReadlineError, DefaultEditor};
use session::session::Session;
use session::{
parse_rest_resolver::{resolve_log_msg, ParseRestReslover},
session::Session,
};
use sources::{
factory::{DltParserSettings, FileFormat, ObserveOptions, ParserType},
producer::MessageProducer,
Expand Down Expand Up @@ -46,6 +49,7 @@ pub(crate) async fn handle_interactive_session(input: Option<PathBuf>) {
let udp_source = UdpSource::new(RECEIVER, vec![]).await.unwrap();
let dlt_parser = DltParser::new(None, None, None, false);
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, udp_source, None);
let mut parse_reslover = ParseRestReslover::new();
let msg_stream = dlt_msg_producer.as_stream();
pin_mut!(msg_stream);
loop {
Expand All @@ -56,10 +60,12 @@ pub(crate) async fn handle_interactive_session(input: Option<PathBuf>) {
}
item = msg_stream.next() => {
match item {
Some((_, MessageStreamItem::Item(ParseYield::Message(msg)))) => {
Some((_, MessageStreamItem::Item(ParseYield::Message(item)))) => {
let msg = resolve_log_msg(item, &mut parse_reslover);
println!("msg: {msg}");
}
Some((_, MessageStreamItem::Item(ParseYield::MessageAndAttachment((msg, attachment))))) => {
Some((_, MessageStreamItem::Item(ParseYield::MessageAndAttachment((item, attachment))))) => {
let msg = resolve_log_msg(item, &mut parse_reslover);
println!("msg: {msg}, attachment: {attachment:?}");
}
Some((_, MessageStreamItem::Item(ParseYield::Attachment(attachment)))) => {
Expand Down
4 changes: 3 additions & 1 deletion application/apps/indexer/parsers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ rand.workspace = true
# someip-messages = { path = "../../../../../someip"}
someip-messages = { git = "https://github.com/esrlabs/someip" }
# someip-payload = { path = "../../../../../someip-payload" }
someip-payload = { git = "https://github.com/esrlabs/someip-payload" }
# someip-payload = { git = "https://github.com/esrlabs/someip-payload" }
# TODO
someip-payload = { git = "https://github.com/kruss/someip-payload.git", branch = "robustness" }

[dev-dependencies]
stringreader = "0.1.1"
119 changes: 99 additions & 20 deletions application/apps/indexer/parsers/src/dlt/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ use log::trace;
use serde::ser::{Serialize, SerializeStruct, Serializer};

use std::{
fmt::{self, Formatter},
fmt::{self, Formatter, Write},
str,
};

use crate::{LogMessage, LogMessageContent, ResolveParseHint, TemplateLogMsg, TemplateLogMsgChunk};

const DLT_COLUMN_SENTINAL: char = '\u{0004}';
const DLT_ARGUMENT_SENTINAL: char = '\u{0005}';
const DLT_NEWLINE_SENTINAL_SLICE: &[u8] = &[0x6];
Expand Down Expand Up @@ -281,6 +283,17 @@ impl<'a> Serialize for FormattableMessage<'a> {
None => state.serialize_field("payload", "[Unknown CtrlCommand]")?,
}
}
PayloadContent::NetworkTrace(slices) => {
state.serialize_field("app-id", &ext_header_app_id)?;
state.serialize_field("context-id", &ext_header_context_id)?;
state.serialize_field("message-type", &ext_header_msg_type)?;
let arg_string = slices
.iter()
.map(|slice| format!("{:02X?}", slice))
.collect::<Vec<String>>()
.join("|");
state.serialize_field("payload", &arg_string)?;
}
}
state.end()
}
Expand Down Expand Up @@ -386,12 +399,25 @@ impl<'a> FormattableMessage<'a> {
payload_string,
))
}
PayloadContent::NetworkTrace(slices) => {
let payload_string = slices
.iter()
.map(|slice| format!("{:02X?}", slice))
.collect::<Vec<String>>()
.join("|");
Ok(PrintableMessage::new(
ext_h_app_id,
eh_ctx_id,
ext_h_msg_type,
payload_string,
))
}
}
}

fn write_app_id_context_id_and_message_type(
&self,
f: &mut fmt::Formatter,
f: &mut impl std::fmt::Write,
) -> Result<(), fmt::Error> {
match self.message.extended_header.as_ref() {
Some(ext) => {
Expand Down Expand Up @@ -419,7 +445,7 @@ impl<'a> FormattableMessage<'a> {
&self,
id: u32,
data: &[u8],
f: &mut fmt::Formatter,
f: &mut impl std::fmt::Write,
) -> fmt::Result {
trace!("format_nonverbose_data");
let mut fibex_info_added = false;
Expand Down Expand Up @@ -511,7 +537,14 @@ impl<'a> FormattableMessage<'a> {
}
}

impl<'a> fmt::Display for FormattableMessage<'a> {
impl LogMessage for FormattableMessage<'_> {
fn to_writer<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
let bytes = self.message.as_bytes();
let len = bytes.len();
writer.write_all(&bytes)?;
Ok(len)
}

/// will format dlt Message with those fields:
/// ********* storage-header ********
/// date-time
Expand All @@ -528,43 +561,89 @@ impl<'a> fmt::Display for FormattableMessage<'a> {
/// context-id
///
/// payload
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
fn try_resolve(&self) -> LogMessageContent {
let mut msg = String::new();
// Taken from Documentation: string formatting is considered an infallible operation.
// Thus we can ignore `fmt::Error` errors.
// Link from Clippy: 'https://rust-lang.github.io/rust-clippy/master/index.html#/format_push_string'
// TODO: Consider another way of concatenating the string after prototyping.
if let Some(h) = &self.message.storage_header {
let tz = self.options.map(|o| o.tz);
match tz {
Some(Some(tz)) => {
write_tz_string(f, &h.timestamp, &tz)?;
write!(f, "{DLT_COLUMN_SENTINAL}{}", h.ecu_id)?;
let _ = write_tz_string(&mut msg, &h.timestamp, &tz);
let _ = write!(msg, "{DLT_COLUMN_SENTINAL}{}", h.ecu_id);
}
_ => {
let _ = write!(msg, "{}", DltStorageHeader(h));
}
_ => write!(f, "{}", DltStorageHeader(h))?,
};
}
let header = DltStandardHeader(&self.message.header);
write!(f, "{DLT_COLUMN_SENTINAL}",)?;
write!(f, "{header}")?;
write!(f, "{DLT_COLUMN_SENTINAL}",)?;
write!(msg, "{DLT_COLUMN_SENTINAL}",).unwrap();
write!(msg, "{header}").unwrap();
write!(msg, "{DLT_COLUMN_SENTINAL}",).unwrap();

match &self.message.payload {
PayloadContent::Verbose(arguments) => {
self.write_app_id_context_id_and_message_type(f)?;
arguments
.iter()
.try_for_each(|arg| write!(f, "{}{}", DLT_ARGUMENT_SENTINAL, DltArgument(arg)))
let _ = self.write_app_id_context_id_and_message_type(&mut msg);
arguments.iter().for_each(|arg| {
let _ = write!(msg, "{}{}", DLT_ARGUMENT_SENTINAL, DltArgument(arg));
});
}
PayloadContent::NonVerbose(id, data) => {
let _ = self.format_nonverbose_data(*id, data, &mut msg);
}
PayloadContent::NonVerbose(id, data) => self.format_nonverbose_data(*id, data, f),
PayloadContent::ControlMsg(ctrl_id, _data) => {
self.write_app_id_context_id_and_message_type(f)?;
let _ = self.write_app_id_context_id_and_message_type(&mut msg);
match service_id_lookup(ctrl_id.value()) {
Some((name, _desc)) => write!(f, "[{name}]"),
None => write!(f, "[Unknown CtrlCommand]"),
Some((name, _desc)) => {
let _ = write!(msg, "[{name}]");
}
None => {
let _ = write!(msg, "[Unknown CtrlCommand]");
}
}
}
PayloadContent::NetworkTrace(slices) => {
let _ = self.write_app_id_context_id_and_message_type(&mut msg);
let is_someip = self
.message
.extended_header
.as_ref()
.is_some_and(|ext_header| {
matches!(
ext_header.message_type,
MessageType::NetworkTrace(NetworkTraceType::Ipc)
| MessageType::NetworkTrace(NetworkTraceType::Someip)
)
});

if is_someip {
if let Some(slice) = slices.get(1) {
let template = TemplateLogMsg::new(
vec![
TemplateLogMsgChunk::Text(msg),
TemplateLogMsgChunk::Bytes(slice.to_owned()),
],
vec![ResolveParseHint::SomeIP],
);
return LogMessageContent::Template(template);
}
}

slices.iter().for_each(|slice| {
let _ = write!(msg, "{}{:02X?}", DLT_ARGUMENT_SENTINAL, slice);
});
}
}

msg.into()
}
}

fn write_tz_string(
f: &mut Formatter,
f: &mut impl std::fmt::Write,
time_stamp: &DltTimeStamp,
tz: &Tz,
) -> Result<(), fmt::Error> {
Expand Down
17 changes: 8 additions & 9 deletions application/apps/indexer/parsers/src/dlt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@ use std::{io::Write, ops::Range};

use self::{attachment::FtScanner, fmt::FormatOptions};

impl LogMessage for FormattableMessage<'_> {
fn to_writer<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
let bytes = self.message.as_bytes();
let len = bytes.len();
writer.write_all(&bytes)?;
Ok(len)
}
}

#[derive(Debug, Serialize)]
pub struct RawMessage {
pub content: Vec<u8>,
Expand Down Expand Up @@ -54,6 +45,10 @@ impl LogMessage for RangeMessage {
writer.write_u64::<BigEndian>(self.range.end as u64)?;
Ok(8 + 8)
}

fn try_resolve(&self) -> crate::LogMessageContent {
self.into()
}
}

impl LogMessage for RawMessage {
Expand All @@ -62,6 +57,10 @@ impl LogMessage for RawMessage {
writer.write_all(&self.content)?;
Ok(len)
}

fn try_resolve(&self) -> crate::LogMessageContent {
self.into()
}
}

#[derive(Default)]
Expand Down
Loading