Skip to content

Commit

Permalink
server: simplify initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Mar 30, 2024
1 parent 4eaaeee commit f81ac40
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 39 deletions.
12 changes: 7 additions & 5 deletions src/bin/toydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use serde_derive::Deserialize;
use std::collections::HashMap;
use tokio::net::TcpListener;
use toydb::error::{Error, Result};
use toydb::raft;
use toydb::sql;
Expand Down Expand Up @@ -56,11 +57,12 @@ async fn main() -> Result<()> {
name => return Err(Error::Config(format!("Unknown SQL storage engine {}", name))),
};

Server::new(cfg.id, cfg.peers, raft_log, raft_state)?
.listen(&cfg.listen_sql, &cfg.listen_raft)
.await?
.serve()
.await
let srv = Server::new(cfg.id, cfg.peers, raft_log, raft_state)?;

let raft_listener = TcpListener::bind(&cfg.listen_raft).await?;
let sql_listener = TcpListener::bind(&cfg.listen_sql).await?;

srv.serve(raft_listener, sql_listener).await
}

#[derive(Debug, Deserialize)]
Expand Down
34 changes: 9 additions & 25 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::{Error, Result};
use crate::error::Result;
use crate::raft;
use crate::sql;
use crate::sql::engine::Engine as _;
Expand All @@ -19,8 +19,6 @@ use tokio_util::codec::{Framed, LengthDelimitedCodec};
/// A toyDB server.
pub struct Server {
raft: raft::Server,
raft_listener: Option<TcpListener>,
sql_listener: Option<TcpListener>,
}

impl Server {
Expand All @@ -31,31 +29,17 @@ impl Server {
raft_log: raft::Log,
raft_state: Box<dyn raft::State>,
) -> Result<Self> {
Ok(Server {
raft: raft::Server::new(id, peers, raft_log, raft_state)?,
raft_listener: None,
sql_listener: None,
})
}

/// Starts listening on the given ports. Must be called before serve.
pub async fn listen(mut self, sql_addr: &str, raft_addr: &str) -> Result<Self> {
let (sql, raft) =
tokio::try_join!(TcpListener::bind(sql_addr), TcpListener::bind(raft_addr),)?;
info!("Listening on {} (SQL) and {} (Raft)", sql.local_addr()?, raft.local_addr()?);
self.sql_listener = Some(sql);
self.raft_listener = Some(raft);
Ok(self)
Ok(Server { raft: raft::Server::new(id, peers, raft_log, raft_state)? })
}

/// Serves Raft and SQL requests until the returned future is dropped. Consumes the server.
pub async fn serve(self) -> Result<()> {
let sql_listener = self
.sql_listener
.ok_or_else(|| Error::Internal("Must listen before serving".into()))?;
let raft_listener = self
.raft_listener
.ok_or_else(|| Error::Internal("Must listen before serving".into()))?;
pub async fn serve(self, raft_listener: TcpListener, sql_listener: TcpListener) -> Result<()> {
info!(
"Listening on {} (SQL) and {} (Raft)",
sql_listener.local_addr()?,
raft_listener.local_addr()?
);

let (raft_tx, raft_rx) = mpsc::unbounded_channel();
let sql_engine = sql::engine::Raft::new(raft_tx);

Expand Down
19 changes: 10 additions & 9 deletions tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::time::Duration;
use tempdir::TempDir;
use tokio::net::TcpListener;

// Movie data
pub fn movies() -> Vec<&'static str> {
Expand Down Expand Up @@ -76,15 +77,15 @@ pub async fn server(
peers: HashMap<raft::NodeID, String>,
) -> Result<Teardown> {
let dir = TempDir::new("toydb")?;
let mut srv = Server::new(
id,
peers,
raft::Log::new(storage::engine::BitCask::new(dir.path().join("log"))?, false)?,
Box::new(sql::engine::Raft::new_state(storage::engine::Memory::new())?),
)?;

srv = srv.listen(addr_sql, addr_raft).await?;
let (task, abort) = srv.serve().remote_handle();
let raft_log = raft::Log::new(storage::engine::BitCask::new(dir.path().join("log"))?, false)?;
let raft_state = Box::new(sql::engine::Raft::new_state(storage::engine::Memory::new())?);
let raft_listener = TcpListener::bind(addr_raft).await?;
let sql_listener = TcpListener::bind(addr_sql).await?;

let (task, abort) = Server::new(id, peers, raft_log, raft_state)?
.serve(raft_listener, sql_listener)
.remote_handle();

tokio::spawn(task);

Ok(Teardown::new(move || {
Expand Down

0 comments on commit f81ac40

Please sign in to comment.