Skip to content

Commit

Permalink
Merge pull request #806 from worldcoin/piohei/restore_status
Browse files Browse the repository at this point in the history
Restore status for identities. Fix tests.
  • Loading branch information
piohei authored Oct 31, 2024
2 parents 4ae4ca4 + 5857512 commit ccc8227
Show file tree
Hide file tree
Showing 14 changed files with 344 additions and 109 deletions.
19 changes: 15 additions & 4 deletions e2e_tests/scenarios/tests/common/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,32 @@ pub async fn inclusion_proof_raw(
let result = String::from_utf8(bytes.into_iter().collect())
.expect("Could not parse response bytes to utf-8");

Ok(RawResponse {
let raw_response = RawResponse {
status_code: response.status(),
body: result,
})
};

debug!(
"Response status={}, body={}",
raw_response.status_code, raw_response.body
);

Ok(raw_response)
}

pub async fn inclusion_proof(
client: &Client<HttpConnector>,
uri: &String,
commitment: &Hash,
) -> anyhow::Result<InclusionProofResponse> {
) -> anyhow::Result<Option<InclusionProofResponse>> {
let result = inclusion_proof_raw(client, uri, commitment).await?;

if result.status_code == StatusCode::NOT_FOUND {
return Ok(None);
}

let result_json = serde_json::from_str::<InclusionProofResponse>(&result.body)
.expect("Failed to parse response as json");

Ok(result_json)
Ok(Some(result_json))
}
4 changes: 2 additions & 2 deletions e2e_tests/scenarios/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ pub async fn mined_inclusion_proof_with_retries(
for _i in 0..retries_count {
last_res = Some(inclusion_proof(client, uri, commitment).await?);

if let Some(ref inclusion_proof_json) = last_res {
if let Some(root) = inclusion_proof_json.0.root {
if let Some(Some(ref inclusion_proof_json)) = last_res {
if let Some(root) = inclusion_proof_json.root {
let (root, ..) = chain
.identity_manager
.query_root(root.into())
Expand Down
52 changes: 48 additions & 4 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use crate::identity::processor::{
};
use crate::identity::validator::IdentityValidator;
use crate::identity_tree::initializer::TreeInitializer;
use crate::identity_tree::{Hash, RootItem, TreeState, TreeVersionReadOps};
use crate::identity_tree::{
Hash, ProcessedStatus, RootItem, TreeState, TreeVersionReadOps, UnprocessedStatus,
};
use crate::prover::map::initialize_prover_maps;
use crate::prover::repository::ProverRepository;
use crate::prover::{ProverConfig, ProverType};
Expand Down Expand Up @@ -311,6 +313,20 @@ impl App {
return Err(ServerError::InvalidCommitment);
}

if self
.database
.get_unprocessed_commitment(commitment)
.await?
.is_some()
{
return Ok(InclusionProofResponse {
status: UnprocessedStatus::New.into(),
root: None,
proof: None,
message: None,
});
}

let item = self
.database
.get_identity_leaf_index(commitment)
Expand Down Expand Up @@ -370,16 +386,44 @@ impl App {
) -> Result<(), ServerError> {
let tree_state = self.tree_state()?;
let latest_root = tree_state.get_latest_tree().get_root();
let batching_root = tree_state.get_batching_tree().get_root();
let processed_root = tree_state.get_processed_tree().get_root();
let mined_root = tree_state.get_mined_tree().get_root();

info!("Validating age max_root_age: {max_root_age:?}");

let root = root_state.root;
if latest_root == root {
return Ok(());
match root_state.status {
// Pending status implies the batching or latest tree
ProcessedStatus::Pending if latest_root == root || batching_root == root => {
warn!("Root is pending - skipping");
return Ok(());
}
// Processed status is hidden - this should never happen
ProcessedStatus::Processed if processed_root == root => {
warn!("Root is processed - skipping");
return Ok(());
}
// Processed status is hidden, so it could be either processed or mined
ProcessedStatus::Mined if processed_root == root || mined_root == root => {
warn!("Root is mined - skipping");
return Ok(());
}
_ => (),
}

let now = Utc::now();
let root_age = now - root_state.pending_valid_as_of;
let root_age = if matches!(
root_state.status,
ProcessedStatus::Pending | ProcessedStatus::Processed
) {
now - root_state.pending_valid_as_of
} else {
let mined_at = root_state
.mined_valid_as_of
.ok_or(ServerError::InvalidRoot)?;
now - mined_at
};

warn!("Root age: {root_age:?}");

Expand Down
6 changes: 3 additions & 3 deletions src/bin/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async fn main() -> anyhow::Result<()> {

let response: InclusionProofResponse = response.json().await?;

let proof: Proof = response.0.proof.context("Missing proof")?;
let proof: Proof = response.proof.context("Missing proof")?;
let proof_serialized = serde_json::to_string_pretty(&proof)?;

if let Some(inclusion_proof_file) = args.inclusion_proof_file.as_ref() {
Expand Down Expand Up @@ -229,14 +229,14 @@ async fn main() -> anyhow::Result<()> {

let response: InclusionProofResponse = response.json().await?;

let root = response.0.root.context("Missing root")?;
let root = response.root.context("Missing root")?;

let nullifier_hash =
semaphore::protocol::generate_nullifier_hash(&identity, x.external_nullifier_hash);

let proof = semaphore::protocol::generate_proof(
&identity,
&response.0.proof.context("Missing proof")?,
&response.proof.context("Missing proof")?,
x.external_nullifier_hash,
x.signal_hash,
)?;
Expand Down
18 changes: 18 additions & 0 deletions src/database/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,24 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
Ok(result.into_iter().map(|(commitment,)| commitment).collect())
}

async fn get_unprocessed_commitment(self, commitment: &Hash) -> Result<Option<Hash>, Error> {
let mut conn = self.acquire().await?;

let result = sqlx::query(
r#"
SELECT commitment FROM unprocessed_identities WHERE commitment = $1
"#,
)
.bind(commitment)
.fetch_optional(&mut *conn)
.await?;

if let Some(row) = result {
return Ok(Some(row.get::<Hash, _>(0)));
};
Ok(None)
}

#[instrument(skip(self), level = "debug")]
async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> {
let mut conn = self.acquire().await?;
Expand Down
24 changes: 17 additions & 7 deletions src/identity/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::database::methods::DbMethods;
use crate::database::types::{BatchEntry, BatchType};
use crate::database::{Database, IsolationLevel};
use crate::ethereum::{Ethereum, ReadProvider};
use crate::identity_tree::{Canonical, Hash, TreeVersion, TreeWithNextVersion};
use crate::identity_tree::{Canonical, Hash, Intermediate, TreeVersion, TreeWithNextVersion};
use crate::prover::identity::Identity;
use crate::prover::repository::ProverRepository;
use crate::prover::Prover;
Expand All @@ -32,7 +32,8 @@ pub trait IdentityProcessor: Send + Sync + 'static {

async fn finalize_identities(
&self,
processed_tree: &TreeVersion<Canonical>,
processed_tree: &TreeVersion<Intermediate>,
mined_tree: &TreeVersion<Canonical>,
) -> anyhow::Result<()>;

async fn await_clean_slate(&self) -> anyhow::Result<()>;
Expand Down Expand Up @@ -88,7 +89,8 @@ impl IdentityProcessor for OnChainIdentityProcessor {

async fn finalize_identities(
&self,
processed_tree: &TreeVersion<Canonical>,
processed_tree: &TreeVersion<Intermediate>,
mined_tree: &TreeVersion<Canonical>,
) -> anyhow::Result<()> {
let mainnet_logs = self.fetch_mainnet_logs().await?;

Expand All @@ -98,7 +100,7 @@ impl IdentityProcessor for OnChainIdentityProcessor {
let mut roots = Self::extract_roots_from_mainnet_logs(mainnet_logs);
roots.extend(self.fetch_secondary_logs().await?);

self.finalize_secondary_roots(roots).await?;
self.finalize_secondary_roots(mined_tree, roots).await?;

Ok(())
}
Expand Down Expand Up @@ -399,7 +401,7 @@ impl OnChainIdentityProcessor {
#[instrument(level = "info", skip_all)]
async fn finalize_mainnet_roots(
&self,
processed_tree: &TreeVersion<Canonical>,
processed_tree: &TreeVersion<Intermediate>,
logs: &[Log],
) -> Result<(), anyhow::Error> {
for log in logs {
Expand Down Expand Up @@ -436,7 +438,11 @@ impl OnChainIdentityProcessor {
}

#[instrument(level = "info", skip_all)]
async fn finalize_secondary_roots(&self, roots: Vec<U256>) -> Result<(), anyhow::Error> {
async fn finalize_secondary_roots(
&self,
mined_tree: &TreeVersion<Canonical>,
roots: Vec<U256>,
) -> Result<(), anyhow::Error> {
for root in roots {
info!(?root, "Finalizing root");

Expand All @@ -456,6 +462,8 @@ impl OnChainIdentityProcessor {
tx.mark_root_as_mined(&root.into()).await?;
tx.commit().await?;

mined_tree.apply_updates_up_to(root.into());

info!(?root, "Root finalized");
}

Expand Down Expand Up @@ -510,7 +518,8 @@ impl IdentityProcessor for OffChainIdentityProcessor {

async fn finalize_identities(
&self,
processed_tree: &TreeVersion<Canonical>,
processed_tree: &TreeVersion<Intermediate>,
mined_tree: &TreeVersion<Canonical>,
) -> anyhow::Result<()> {
let batches = {
let mut committed_batches = self.committed_batches.lock().unwrap();
Expand All @@ -531,6 +540,7 @@ impl IdentityProcessor for OffChainIdentityProcessor {
tx.commit().await?;

processed_tree.apply_updates_up_to(batch.next_root);
mined_tree.apply_updates_up_to(batch.next_root);
}

Ok(())
Expand Down
Loading

0 comments on commit ccc8227

Please sign in to comment.