Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add GetEpoch local state query #320

Closed
wants to merge 13 commits into from
59 changes: 38 additions & 21 deletions examples/n2c-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ use pallas::network::{
};
use tracing::info;

async fn do_localstate_query(client: &mut NodeClient) {
client.statequery().acquire(None).await.unwrap();
async fn do_localstate_query(client: &mut NodeClient, query: localstate::queries::Request) {
do_localstate_query_acquisition(client).await;

let result = client
.statequery()
.query(localstate::queries::Request::GetSystemStart)
.await
.unwrap();
let result = client.statequery().query(query).await.unwrap();

info!("result: {:?}", result);

client.statequery().send_release().await.unwrap();
}

info!("system start result: {:?}", result);
async fn do_localstate_query_acquisition(client: &mut NodeClient) {
if let localstate::State::Idle = client.statequery().state() {
client.statequery().acquire(None).await.unwrap();
}
}

async fn do_chainsync(client: &mut NodeClient) {
Expand Down Expand Up @@ -43,16 +47,7 @@ async fn do_chainsync(client: &mut NodeClient) {
}
}

#[cfg(target_family = "unix")]
#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();

async fn setup_client() -> NodeClient {
// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
let socket_path = "/tmp/node.socket";
Expand All @@ -61,14 +56,36 @@ async fn main() {
let version_table = NodeClient::handshake_query(socket_path, MAINNET_MAGIC)
.await
.unwrap();

info!("handshake query result: {:?}", version_table);

let mut client = NodeClient::connect(socket_path, MAINNET_MAGIC)
NodeClient::connect(socket_path, MAINNET_MAGIC)
.await
.unwrap();
.unwrap()
}

#[cfg(target_family = "unix")]
#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();

let mut client = setup_client().await;

// specify the query we want to execute
let get_system_start_query = localstate::queries::Request::GetSystemStart;
let get_epoch_query =
localstate::queries::Request::BlockQuery(localstate::queries::BlockQuery::GetEpochNo);

// execute an arbitrary "Local State" query against the node
do_localstate_query(&mut client).await;
do_localstate_query(&mut client, get_system_start_query).await;
do_localstate_query(&mut client, get_epoch_query).await;

client.statequery().send_done().await.unwrap();

// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(&mut client).await;
Expand Down
5 changes: 4 additions & 1 deletion pallas-network/src/miniprotocols/localstate/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ where
pub async fn send_message(&mut self, msg: &Message<Q>) -> Result<(), ClientError> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).await.map_err(ClientError::Plexer)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(ClientError::Plexer)?;

Ok(())
}
Expand Down
55 changes: 28 additions & 27 deletions pallas-network/src/miniprotocols/localstate/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,27 +122,26 @@ impl Encode<()> for BlockQuery {
BlockQuery::GetRewardInfoPools => {
e.array(1)?;
e.u16(18)?;
}
// BlockQuery::GetPoolState(()) => {
// e.array(X)?;
// e.u16(19)?;
// }
// BlockQuery::GetStakeSnapshots(()) => {
// e.array(X)?;
// e.u16(20)?;
// }
// BlockQuery::GetPoolDistr(()) => {
// e.array(X)?;
// e.u16(21)?;
// }
// BlockQuery::GetStakeDelegDeposits(()) => {
// e.array(X)?;
// e.u16(22)?;
// }
// BlockQuery::GetConstitutionHash => {
// e.array(1)?;
// e.u16(23)?;
// }
} // BlockQuery::GetPoolState(()) => {
// e.array(X)?;
// e.u16(19)?;
// }
// BlockQuery::GetStakeSnapshots(()) => {
// e.array(X)?;
// e.u16(20)?;
// }
// BlockQuery::GetPoolDistr(()) => {
// e.array(X)?;
// e.u16(21)?;
// }
// BlockQuery::GetStakeDelegDeposits(()) => {
// e.array(X)?;
// e.u16(22)?;
// }
// BlockQuery::GetConstitutionHash => {
// e.array(1)?;
// e.u16(23)?;
// }
}
Ok(())
}
Expand Down Expand Up @@ -241,11 +240,9 @@ impl<'b> Decode<'b, ()> for Request {
(1, 1) => Ok(Self::GetSystemStart),
(1, 2) => Ok(Self::GetChainBlockNo),
(1, 3) => Ok(Self::GetChainPoint),
_ => {
return Err(decode::Error::message(
"invalid (size, tag) for lsq request",
))
}
_ => Err(decode::Error::message(
"invalid (size, tag) for lsq request",
)),
}
}
}
Expand All @@ -258,6 +255,10 @@ impl GenericResponse {
pub fn new(bytes: Vec<u8>) -> Self {
Self(bytes)
}

pub fn bytes(&self) -> &[u8] {
&self.0
}
}

impl Encode<()> for GenericResponse {
Expand All @@ -268,7 +269,7 @@ impl Encode<()> for GenericResponse {
) -> Result<(), encode::Error<W::Error>> {
e.writer_mut()
.write_all(&self.0)
.map_err(|e| encode::Error::write(e))
.map_err(encode::Error::write)
}
}

Expand Down
Loading