Skip to content

Commit

Permalink
Define loggers in terms of container builders (#615)
Browse files Browse the repository at this point in the history
* Define loggers in terms of container builders

The logging infrastructure had some old assumptions built in, such as the
container type and the size of buffers. With this change, we defer to the
container builder pattern to re-use the existing infrastructure. This also
allows us to switch the container type to something else if we'd like to do
so.

Signed-off-by: Moritz Hoffmann <[email protected]>

* Make the communication log event builder public

Signed-off-by: Moritz Hoffmann <[email protected]>

* Back out some changes, introduce typed logger

Signed-off-by: Moritz Hoffmann <[email protected]>

* Cleanup

Signed-off-by: Moritz Hoffmann <[email protected]>

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Jan 9, 2025
1 parent 486c988 commit 452226b
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 123 deletions.
1 change: 1 addition & 0 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ getopts = { version = "0.2.21", optional = true }
byteorder = "1.5"
serde = { version = "1.0", features = ["derive"] }
timely_bytes = { path = "../bytes", version = "0.12" }
timely_container = { path = "../container", version = "0.13.0" }
timely_logging = { path = "../logging", version = "0.13" }
crossbeam-channel = "0.5"
10 changes: 5 additions & 5 deletions communication/src/allocator/zero_copy/initialize.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Network initialization.
use std::sync::Arc;
// use crate::allocator::Process;
use timely_logging::Logger;
use crate::allocator::process::ProcessBuilder;
use crate::logging::CommunicationEventBuilder;
use crate::networking::create_sockets;
use super::tcp::{send_loop, recv_loop};
use super::allocator::{TcpBuilder, new_vector};
Expand Down Expand Up @@ -30,16 +31,15 @@ impl Drop for CommsGuard {
}
}

use crate::logging::{CommunicationSetup, CommunicationEvent};
use timely_logging::Logger;
use crate::logging::CommunicationSetup;

/// Initializes network connections
pub fn initialize_networking(
addresses: Vec<String>,
my_index: usize,
threads: usize,
noisy: bool,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
Expand All @@ -58,7 +58,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
mut sockets: Vec<Option<S>>,
my_index: usize,
threads: usize,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
Expand Down
9 changes: 5 additions & 4 deletions communication/src/allocator/zero_copy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::stream::Stream;

use timely_logging::Logger;

use crate::logging::{CommunicationEvent, MessageEvent, StateEvent};
use crate::logging::{CommunicationEvent, CommunicationEventBuilder, MessageEvent, StateEvent};

fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
// NOTE: some downstream crates sniff out "timely communication error:" from
Expand All @@ -35,10 +35,11 @@ pub fn recv_loop<S>(
worker_offset: usize,
process: usize,
remote: usize,
mut logger: Option<Logger<CommunicationEvent>>)
logger: Option<Logger<CommunicationEventBuilder>>)
where
S: Stream,
{
let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
// Log the receive thread's start.
logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true }));

Expand Down Expand Up @@ -134,9 +135,9 @@ pub fn send_loop<S: Stream>(
sources: Vec<Sender<MergeQueue>>,
process: usize,
remote: usize,
mut logger: Option<Logger<CommunicationEvent>>)
logger: Option<Logger<CommunicationEventBuilder>>)
{

let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
// Log the send thread's start.
logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));

Expand Down
16 changes: 7 additions & 9 deletions communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
use std::thread;
#[cfg(feature = "getopts")]
use std::io::BufRead;
#[cfg(feature = "getopts")]
use getopts;
use std::sync::Arc;

use std::fmt::{Debug, Formatter};
use std::any::Any;

#[cfg(feature = "getopts")]
use getopts;
use timely_logging::Logger;

use crate::allocator::thread::ThreadBuilder;
use crate::allocator::{AllocateBuilder, Process, Generic, GenericBuilder};
use crate::allocator::zero_copy::allocator_process::ProcessBuilder;
use crate::allocator::zero_copy::initialize::initialize_networking;

use crate::logging::{CommunicationSetup, CommunicationEvent};
use timely_logging::Logger;
use std::fmt::{Debug, Formatter};

use crate::logging::{CommunicationEventBuilder, CommunicationSetup};

/// Possible configurations for the communication infrastructure.
#[derive(Clone)]
Expand All @@ -39,7 +37,7 @@ pub enum Config {
/// Verbosely report connection process
report: bool,
/// Closure to create a new logger for a communication thread
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent>> + Send + Sync>,
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEventBuilder>> + Send + Sync>,
}
}

Expand Down
4 changes: 4 additions & 0 deletions communication/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use columnar::Columnar;
use serde::{Serialize, Deserialize};
use timely_container::CapacityContainerBuilder;

/// Configuration information about a communication thread.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
Expand Down Expand Up @@ -53,3 +54,6 @@ impl From<MessageEvent> for CommunicationEvent {
impl From<StateEvent> for CommunicationEvent {
fn from(v: StateEvent) -> CommunicationEvent { CommunicationEvent::State(v) }
}

/// Builder for communication log events.
pub type CommunicationEventBuilder = CapacityContainerBuilder<Vec<(std::time::Duration, CommunicationEvent)>>;
3 changes: 3 additions & 0 deletions logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ homepage = "https://github.com/TimelyDataflow/timely-dataflow"
repository = "https://github.com/TimelyDataflow/timely-dataflow.git"
keywords = ["timely", "dataflow", "logging"]
license = "MIT"

[dependencies]
timely_container = { version = "0.13.0", path = "../container" }
Loading

0 comments on commit 452226b

Please sign in to comment.