diff --git a/bin/tests/it/cli.rs b/bin/tests/it/cli.rs index 7b85e2556..0a38d7c9b 100644 --- a/bin/tests/it/cli.rs +++ b/bin/tests/it/cli.rs @@ -1573,7 +1573,6 @@ async fn test_line_redact() { test_line_rules(None, None, redact, to_write, expected).await; } -#[ignore] #[tokio::test] async fn test_directory_created_after_initialization() { let _ = env_logger::Builder::from_default_env().try_init(); @@ -1591,6 +1590,10 @@ async fn test_directory_created_after_initialization() { let file_path = future_dir.join("test.log"); std::fs::create_dir(&future_dir).unwrap(); File::create(&file_path).unwrap(); + + // Wait for file to be picked up by agent + tokio::time::sleep(Duration::from_millis(500)).await; + common::append_to_file(&file_path, 10, 5).unwrap(); common::force_client_to_flush(&future_dir).await; diff --git a/common/config/src/lib.rs b/common/config/src/lib.rs index e41f5abcb..e7e501b3e 100755 --- a/common/config/src/lib.rs +++ b/common/config/src/lib.rs @@ -341,12 +341,12 @@ impl TryFrom for Config { .log .dirs .into_iter() - // Filter off paths that are not directories and warn about them + // Find valid directory paths and keep track of missing paths .filter_map(|d| { d.clone() .try_into() .map_err(|e| { - warn!("{} is not a valid directory {}", d.display(), e); + warn!("{} is not a valid directory: {}", d.display(), e); }) .ok() }) diff --git a/common/fs/src/cache/dir_path.rs b/common/fs/src/cache/dir_path.rs index 864219a65..4b1aed850 100644 --- a/common/fs/src/cache/dir_path.rs +++ b/common/fs/src/cache/dir_path.rs @@ -8,13 +8,16 @@ pub enum DirPathBufError { NotADirPath(PathBuf), #[error("I/O error: {0}")] Io(#[from] std::io::Error), + #[error("directory config error occured")] + EmptyDirPath(Option), } // Strongly typed wrapper around PathBuf, cannot be constructed unless // the directory it's referring to exists -#[derive(std::fmt::Debug, Clone)] +#[derive(Eq, std::fmt::Debug, Clone, PartialEq)] pub struct DirPathBuf { - inner: PathBuf, + pub inner: PathBuf, + pub postfix: Option, } impl Deref for DirPathBuf { @@ -27,18 +30,18 @@ impl Deref for DirPathBuf { impl std::convert::TryFrom for DirPathBuf { type Error = DirPathBufError; fn try_from(path: PathBuf) -> Result { - //TODO: We want to allow paths that are not yet present: LOG-10041 - // For now, prevent validation on Windows #[cfg(unix)] - if std::fs::canonicalize(&path)?.is_dir() { - Ok(DirPathBuf { inner: path }) - } else { - Err(DirPathBufError::NotADirPath(path)) + match find_valid_path(Some(path.clone()), None) { + Ok(p) => Ok(p), + _ => Err(DirPathBufError::NotADirPath(path)), } #[cfg(windows)] { - Ok(DirPathBuf { inner: path }) + Ok(DirPathBuf { + inner: path, + postfix: None, + }) } } } @@ -48,7 +51,10 @@ impl std::convert::TryFrom<&Path> for DirPathBuf { fn try_from(path: &Path) -> Result { #[cfg(unix)] if path.is_dir() { - Ok(DirPathBuf { inner: path.into() }) + Ok(DirPathBuf { + inner: path.into(), + postfix: None, + }) } else { path.to_str().map_or_else( || Err("path is not a directory and cannot be formatted".into()), @@ -58,7 +64,10 @@ impl std::convert::TryFrom<&Path> for DirPathBuf { #[cfg(windows)] { - Ok(DirPathBuf { inner: path.into() }) + Ok(DirPathBuf { + inner: path.into(), + postfix: None, + }) } } } @@ -74,3 +83,183 @@ impl From for PathBuf { d.inner } } + +/// If a path does not exist, the function continue to move to the parent directory +/// until it finds a valid directory. +/// +/// When a valid directory is found, it adds the missing piece of the directory +/// path to `postfix` and returns the `DirPathBuf` result. +fn find_valid_path( + path: Option, + postfix: Option, +) -> Result { + match path { + Some(p) => { + if matches!(std::fs::canonicalize(&p), Ok(_)) { + Ok(DirPathBuf { inner: p, postfix }) + } else { + warn!("{:?} is not a directory; moving to parent directory", p); + let root_pathbuf = Path::new("/"); + let mut postfix_pathbuf = PathBuf::new(); + let mut pop_pathbuf = PathBuf::new(); + if let Some(pop) = postfix { + pop_pathbuf.push(pop); + } + let parent = match level_up(&p) { + Some(p) => { + if p == root_pathbuf { + warn!("root level directory was missing, as a result configured directory recursed to the root level!"); + } + p + } + None => return Err(DirPathBufError::NotADirPath(p)), + }; + + let tmp_dir = parent; + let postfix_path = p.strip_prefix(tmp_dir).ok(); + postfix_pathbuf.push(postfix_path.unwrap()); + postfix_pathbuf.push(pop_pathbuf); + find_valid_path(level_up(&p), Some(postfix_pathbuf)) + } + } + None => Err(DirPathBufError::EmptyDirPath(path)), + } +} + +fn level_up(path: &Path) -> Option { + let mut parent_path = PathBuf::new(); + match path.parent() { + Some(p) => { + parent_path.push(p); + Some(parent_path) + } + None => None, + } +} + +#[cfg(test)] +mod tests { + use std::env::temp_dir; + + use super::*; + + #[test] + fn test_level_up() { + let mut expe_pathbuf = PathBuf::new(); + expe_pathbuf.push("/test-directory"); + + let new_pathbuf = level_up(Path::new("/test-directory/sub-directory")); + assert_eq!(expe_pathbuf, new_pathbuf.unwrap()); + + let invalid_path = level_up(Path::new("/")); + assert_eq!(None, invalid_path); + + let root_path = level_up(Path::new("")); + assert_eq!(None, root_path); + } + + #[test] + fn test_find_valid_path() { + let mut test_path = PathBuf::new(); + let test_postfix = PathBuf::new(); + let mut expected_pathbuff = PathBuf::new(); + expected_pathbuff.push(Path::new("sub-test-path")); + + let tmp_dir = temp_dir(); + let tmp_test_dir = tmp_dir.join("test-dir-0"); + std::fs::create_dir(&tmp_test_dir).expect("could not create tmp directory"); + assert!(tmp_test_dir.is_dir()); + + test_path.push(tmp_test_dir.join("sub-test-path")); + let test_result = find_valid_path(Some(test_path), Some(test_postfix)); + assert_eq!(expected_pathbuff, test_result.unwrap().postfix.unwrap()); + + // Clean up + std::fs::remove_dir(&tmp_test_dir).expect("could not remove tmp directory"); + assert!(!tmp_test_dir.is_dir()); + } + + #[test] + fn test_deep_find_valid_path() { + let mut test_path = PathBuf::new(); + let test_postfix = PathBuf::new(); + let mut expected_pathbuff = PathBuf::new(); + expected_pathbuff.push(Path::new("sub-path/sub-sub-path")); + + let tmp_dir = temp_dir(); + let tmp_test_dir = tmp_dir.join("test-dir-1"); + std::fs::create_dir(&tmp_test_dir).expect("could not create tmp directory"); + assert!(tmp_test_dir.is_dir()); + + test_path.push(tmp_test_dir.join("sub-path/sub-sub-path")); + let test_result = find_valid_path(Some(test_path), Some(test_postfix)); + assert_eq!(expected_pathbuff, test_result.unwrap().postfix.unwrap()); + + // Clean up + std::fs::remove_dir(&tmp_test_dir).expect("could not remove tmp directory"); + assert!(!tmp_test_dir.is_dir()); + } + + #[test] + fn test_filename_find_valid_path() { + let mut test_path = PathBuf::new(); + let test_postfix = PathBuf::new(); + let mut expected_pathbuff = PathBuf::new(); + expected_pathbuff.push(Path::new("test_file.log/")); + + let tmp_dir = temp_dir(); + let tmp_test_dir = tmp_dir.join("test-dir-2"); + std::fs::create_dir(&tmp_test_dir).expect("could not create tmp directory"); + assert!(tmp_test_dir.is_dir()); + + test_path.push(tmp_test_dir.join("test_file.log")); + let test_result = find_valid_path(Some(test_path), Some(test_postfix)); + assert_eq!(expected_pathbuff, test_result.unwrap().postfix.unwrap()); + + // Clean up + std::fs::remove_dir(&tmp_test_dir).expect("could not remove tmp directory"); + assert!(!tmp_test_dir.is_dir()); + } + + #[test] + fn test_empty_find_valid_path() { + let mut test_path = PathBuf::new(); + let test_postfix = PathBuf::new(); + + test_path.push(Path::new("")); + let test_result = find_valid_path(Some(test_path), Some(test_postfix)); + assert!(test_result.is_err()); + } + + #[test] + fn test_invalid_find_valid_path() { + let mut test_path = PathBuf::new(); + let test_postfix = PathBuf::new(); + + test_path.push(Path::new("ivc:")); + let test_result = find_valid_path(Some(test_path), Some(test_postfix)); + assert!(test_result.is_err()); + } + + #[test] + fn test_root_find_valid_path() { + let mut test_path = PathBuf::new(); + let test_postfix = PathBuf::new(); + + test_path.push(Path::new("/")); + let test_result = find_valid_path(Some(test_path), Some(test_postfix)); + assert_eq!(Path::new(""), test_result.unwrap().postfix.unwrap()); + } + + #[test] + fn test_root_lvl_find_valid_path() { + let mut test_path = PathBuf::new(); + let test_postfix = PathBuf::new(); + let mut expected_pathbuff = PathBuf::new(); + expected_pathbuff.push(Path::new("does-not-exist")); + + test_path.push(Path::new("/does-not-exist")); + let result = find_valid_path(Some(test_path), Some(test_postfix)); + assert_eq!(expected_pathbuff, result.unwrap().postfix.unwrap()); + } +} diff --git a/common/fs/src/cache/mod.rs b/common/fs/src/cache/mod.rs index 84c2eeb98..7cc030acd 100644 --- a/common/fs/src/cache/mod.rs +++ b/common/fs/src/cache/mod.rs @@ -210,6 +210,7 @@ fn get_resume_events( pub struct FileSystem { watcher: Watcher, + missing_dir_watcher: Option, pub entries: Rc>, symlinks: Symlinks, watch_descriptors: WatchDescriptors, @@ -217,7 +218,7 @@ pub struct FileSystem { master_rules: Rules, initial_dirs: Vec, initial_dir_rules: Rules, - + missing_dirs: Vec, initial_events: Vec, lookback_config: Lookback, @@ -262,19 +263,54 @@ impl FileSystem { ) -> Self { let (resume_events_send, resume_events_recv) = async_channel::unbounded(); initial_dirs.iter().for_each(|path| { - if !path.is_dir() { + if !path.inner.is_dir() { panic!("initial dirs must be dirs") } }); + + let mut missing_dirs: Vec = Vec::new(); + let watcher = Watcher::new(delay); + let mut missing_dir_watcher = Watcher::new(delay); let entries = SlotMap::new(); let mut initial_dir_rules = Rules::new(); let ignored_dirs = HashSet::new(); + // Adds initial directories and constructs missing directory + // vector and adds prefix path to the missing directory watcher for path in initial_dirs.iter() { - add_initial_dir_rules(&mut initial_dir_rules, path); + if path.postfix.is_none() { + add_initial_dir_rules(&mut initial_dir_rules, path); + } else { + let mut full_missing_path = PathBuf::new(); + let root_pathbuf = Path::new("/"); + let mut format_postfix = + String::from(path.postfix.as_ref().unwrap().to_str().unwrap()); + if format_postfix.ends_with('/') { + format_postfix.pop(); + } + if path.inner == root_pathbuf { + full_missing_path.push(format!("{}{}", &path.inner.display(), format_postfix)); + } else { + full_missing_path.push(format!("{}/{}", &path.inner.display(), format_postfix)); + } + let full_missing_dirpathbuff = DirPathBuf { + inner: full_missing_path.clone(), + postfix: None, + }; + + // Add missing directory to Rules + add_initial_dir_rules(&mut initial_dir_rules, &full_missing_dirpathbuff); + + info!("adding {:?} to missing directory watcher", path.inner); + missing_dirs.push(full_missing_path); + missing_dir_watcher + .watch(&path.inner, RecursiveMode::NonRecursive) + .expect("Could not add path to missing directory watcher"); + } } + debug!("initial directory rules: {:?}\n", initial_dir_rules); let mut fs = Self { entries: Rc::new(RefCell::new(entries)), @@ -284,9 +320,11 @@ impl FileSystem { master_rules: rules, initial_dirs: initial_dirs.clone(), initial_dir_rules, + missing_dirs, lookback_config, initial_offsets, watcher, + missing_dir_watcher: Some(missing_dir_watcher), initial_events: Vec::new(), resume_events_recv, resume_events_send, @@ -296,6 +334,7 @@ impl FileSystem { let entries = fs.entries.clone(); let mut entries = entries.borrow_mut(); + // Initial dirs let mut initial_dir_events = Vec::new(); for dir in initial_dirs .into_iter() @@ -339,6 +378,7 @@ impl FileSystem { } } } + for event in initial_dir_events { match event { Event::New(entry_key) => fs.initial_events.push(Event::Initialize(entry_key)), @@ -348,6 +388,7 @@ impl FileSystem { fs } + // Stream events pub fn stream_events( fs: Arc>, ) -> Result, EventTimestamp)>, std::io::Error> { @@ -359,6 +400,68 @@ impl FileSystem { watcher.receive() }; + let mfs = fs.clone(); // clone fs for missing files + let missing_dirs = mfs + .try_lock() + .expect("could not lock filesystem cache") + .missing_dirs + .clone(); + let missing_dir_watcher = mfs + .try_lock() + .expect("could not lock filesystem cache") + .missing_dir_watcher + .take() + .unwrap_or_else(|| Watcher::new(Duration::new(0, 10000000))); + + let missing_dir_event_stream = missing_dir_watcher.receive(); + + let missing_dir_event = futures::stream::unfold( + ( + mfs, + missing_dirs, + missing_dir_watcher, + Box::pin(missing_dir_event_stream), + ), + move |(mfs, missing, mut watcher, mut stream)| async move { + loop { + let (event, _) = stream.next().await?; + debug!("missing directory watcher event: {:?}", event); + if let notify_stream::Event::Create(ref path) = event { + if missing.contains(path) { + // Got a complete directory match, inserting it + info!("missing directory {:?} was found!", path); + return Some(( + as_event_stream(mfs.clone(), event, OffsetDateTime::now_utc()), + (mfs, missing, watcher, stream), + )); + } + if missing.iter().any(|m| m.starts_with(&path)) { + info!("found sub-path of missing directory {:?}", path); + for dir in missing.iter() { + // Check if full path was created along with sub-path + if dir.exists() { + info!("full path exists {:?}", dir); + let create_event = WatchEvent::Create(dir.clone()); + return Some(( + as_event_stream( + mfs.clone(), + create_event, + OffsetDateTime::now_utc(), + ), + (mfs, missing, watcher, stream), + )); + } + watcher + .watch(path, RecursiveMode::NonRecursive) + .expect("Could not add inital value to missing_dir_watch"); + } + } + } + } + }, + ) + .flatten(); + let initial_events = get_initial_events(&fs); let resume_events_recv = get_resume_events(&fs); let events = futures::stream::select(resume_events_recv, events_stream) @@ -370,7 +473,10 @@ impl FileSystem { }) .flatten(); - Ok(initial_events.chain(events)) + Ok(futures::stream::select( + initial_events.chain(events), + missing_dir_event, + )) } /// Handles inotify events and may produce Event(s) that are returned upstream through sender diff --git a/common/notify_stream/src/lib.rs b/common/notify_stream/src/lib.rs index e88eddd33..a0a1ac7f4 100644 --- a/common/notify_stream/src/lib.rs +++ b/common/notify_stream/src/lib.rs @@ -136,9 +136,9 @@ impl Watcher { } /// Starts receiving the watcher events - pub fn receive(&self) -> impl Stream { + pub fn receive(&self) -> impl Stream + Unpin { let rx = Rc::clone(&self.rx); - stream::unfold(rx, |rx| async move { + Box::pin(stream::unfold(rx, |rx| async move { loop { let received = rx.recv().await.expect("channel can not be closed"); log::trace!("received raw notify event: {:?}", received); @@ -160,7 +160,7 @@ impl Watcher { return Some(((mapped_event, OffsetDateTime::now_utc()), rx)); } } - }) + })) } }