-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sequencing): validate streamed proposals #2305
Conversation
18577f9
to
9cdaafe
Compare
Artifacts upload workflows: |
9cdaafe
to
f14b4d1
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2305 +/- ##
===========================================
- Coverage 40.10% 14.19% -25.92%
===========================================
Files 26 224 +198
Lines 1895 30457 +28562
Branches 1895 30457 +28562
===========================================
+ Hits 760 4322 +3562
- Misses 1100 25767 +24667
- Partials 35 368 +333 ☔ View full report in Codecov by Sentry. |
f14b4d1
to
f515d0c
Compare
5f06879
to
4a79c7e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 11 of 11 files at r5, all commit messages.
Reviewable status: 11 of 19 files reviewed, 17 unresolved discussions (waiting on @guy-starkware)
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 237 at r5 (raw file):
// Receive a valid proposal. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); let txs = vec![generate_invoke_tx(Felt::TWO)]
Just change the return type of this
Code quote:
generate_invoke_tx
crates/sequencing/papyrus_consensus/src/test_utils.rs
line 33 at r5 (raw file):
fn try_from(part: MockProposalPart) -> Result<Self, Self::Error> { Ok(ProposalInit { height: BlockNumber(part.0 as u64),
Or redefine the type as u8
Suggestion:
height: BlockNumber(part.0)
crates/sequencing/papyrus_consensus/src/test_utils.rs
line 129 at r5 (raw file):
pub fn proposal_fin(block_felt: Felt) -> ProposalFin { ProposalFin { proposal_content_id: BlockHash(block_felt) } }
Not enough boilerplate saved to be worth it.
crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs
line 73 at r5 (raw file):
content_sender.send(MockProposalPart(1)).await.unwrap(); // let (fin_sender, fin_receiver) = oneshot::channel(); // fin_sender.send(BLOCK.id).unwrap();
should this be uncommented?
Code quote:
// let (fin_sender, fin_receiver) = oneshot::channel();
// fin_sender.send(BLOCK.id).unwrap();
crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs
line 79 at r5 (raw file):
PROPOSAL_INIT.clone(), content_receiver, /* Note: we are only sending the fin, there's no actual content. * mpsc::channel(1).1, // content - ignored by SHC. */
Suggestion:
content_receiver
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 190 at r5 (raw file):
panic!("Did not receive a Fin message"); } }
Suggestion:
match content.next().await {
Some(ProposalPart::Transactions(batch)) => {
for tx in batch.transactions {
content_transactions.push(tx);
}
}
Some(ProposalPart::Fin(fin)) => {
break fin.proposal_content_id;
}
msg @ _ => panic!("unexpected message: {msg:?}"),
}
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 194 at r5 (raw file):
// Check each transaction matches the transactions in the storage. for tx in transactions.iter().rev() {
Why reverse?
Code quote:
for tx in transactions.iter().rev() {
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 201 at r5 (raw file):
panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}"); } }
Suggestion:
for tx in transactions.iter().rev() {
let received_tx = content_transactions
.pop()
.expect("Received less transactions than expected");
if tx != &received_tx {
panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}");
}
}
assert!(
content_transactions.is_empty(),
"Received more transactions than expected"
);
crates/sequencing/papyrus_consensus/src/manager_test.rs
line 106 at r5 (raw file):
proposal_receiver_sender.send(proposal_receiver).await.unwrap(); proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap(); proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::ONE))).await.unwrap();
Can we move this implementation to when we actually support this? This test is ignored so this is useless here
Code quote:
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))).await.unwrap();
proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::TWO))).await.unwrap();
send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap();
proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::ONE))).await.unwrap();
crates/sequencing/papyrus_consensus/src/manager_test.rs
line 355 at r5 (raw file):
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); proposal_receiver_sender.send(proposal_receiver).await.unwrap(); proposal_sender.send(ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))).await.unwrap();
These 3 lnes repeat a lot, any helper function?
Code quote:
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))).await.unwrap();
crates/sequencing/papyrus_consensus/src/types.rs
line 79 at r5 (raw file):
timeout: Duration, content: mpsc::Receiver<Self::ProposalPart>, ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>;
Suggestion:
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 402 at r5 (raw file):
network_block_id = id; // Output this along with the ID from batcher, to compare them. break; }
This is the only break point right? If so let's do id = while let
above and break id;
here.
Code quote:
ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => {
network_block_id = id; // Output this along with the ID from batcher, to compare them.
break;
}
crates/sequencing/papyrus_consensus/src/manager.rs
line 45 at r5 (raw file):
where ContextT: ConsensusContext, <ContextT as ConsensusContext>::ProposalPart: std::fmt::Debug,
Want to just put this inside of types.rs
?
Code quote:
std::fmt::Debug
crates/sequencing/papyrus_consensus/src/manager.rs
line 147 at r5 (raw file):
Some(mut content_receiver) = proposal_receiver.next() => { // Get the first message to verify the init was sent. // TODO(guyn): what happens if the channel never sends anything?
Add a timeout (another PR).
IIUC, the streaming component, should only send us this content_receiver
once it already has the first message of the stream (message_id=0). This means there is an error in the Streaming component and we should panic.
Code quote:
// TODO(guyn): what happens if the channel never sends anything?
crates/sequencing/papyrus_consensus/src/manager.rs
line 185 at r5 (raw file):
<ContextT as ConsensusContext>::ProposalPart: std::fmt::Debug, { // TODO(guyn): what is the right thing to do if proposal's height doesn't match?
You wrote the answer below, cache future heights.
Drop old heights
Code quote:
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
line 69 at r5 (raw file):
/// 3. Once validation is complete, the manager returns the built proposal to the SHC as an /// event, which can be sent to the SM. ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalContentId)>),
Suggestion:
ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalFin)>),
crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
line 116 at r5 (raw file):
} ShcTask::ValidateProposal(init, block_receiver) => { let (block_proposal_id, network_proposal_id) = match block_receiver.await {
Rename throughtout. I want the names to hightlight that one of these is built locally from the content and the other is sent by the receiver (eventually it will be a signature)
Suggestion:
let (built_content_id, received_proposal_fin) = match block_receiver.await {
dbcada6
to
bd0fbd4
Compare
6280ae8
to
0018cbc
Compare
2525648
to
871a959
Compare
0018cbc
to
89628e8
Compare
14b88ce
to
6036394
Compare
abab998
to
582f17b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 20 files reviewed, 17 unresolved discussions (waiting on @matan-starkware)
crates/sequencing/papyrus_consensus/src/manager.rs
line 45 at r5 (raw file):
Previously, matan-starkware wrote…
Want to just put this inside of
types.rs
?
Good idea!
crates/sequencing/papyrus_consensus/src/manager.rs
line 147 at r5 (raw file):
Previously, matan-starkware wrote…
Add a timeout (another PR).
IIUC, the streaming component, should only send us this
content_receiver
once it already has the first message of the stream (message_id=0). This means there is an error in the Streaming component and we should panic.
Do you want me to do anything in this PR? maybe update the TODO item?
crates/sequencing/papyrus_consensus/src/manager.rs
line 185 at r5 (raw file):
Previously, matan-starkware wrote…
You wrote the answer below, cache future heights.
Drop old heights
Sorry, yeah I actually did this in the next PR, this is just a reminder for me.
crates/sequencing/papyrus_consensus/src/manager_test.rs
line 106 at r5 (raw file):
Previously, matan-starkware wrote…
Can we move this implementation to when we actually support this? This test is ignored so this is useless here
Yeah, we can leave this to the next PR. Removing for now.
crates/sequencing/papyrus_consensus/src/manager_test.rs
line 355 at r5 (raw file):
Previously, matan-starkware wrote…
These 3 lnes repeat a lot, any helper function?
Good idea.
crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
line 116 at r5 (raw file):
Previously, matan-starkware wrote…
Rename throughtout. I want the names to hightlight that one of these is built locally from the content and the other is sent by the receiver (eventually it will be a signature)
Done.
crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs
line 73 at r5 (raw file):
Previously, matan-starkware wrote…
should this be uncommented?
No, these are not used anymore. Removing.
crates/sequencing/papyrus_consensus/src/test_utils.rs
line 33 at r5 (raw file):
Previously, matan-starkware wrote…
Or redefine the type as u8
Actually I think I've changed it to be u64
everywhere.
crates/sequencing/papyrus_consensus/src/test_utils.rs
line 129 at r5 (raw file):
Previously, matan-starkware wrote…
Not enough boilerplate saved to be worth it.
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 194 at r5 (raw file):
Previously, matan-starkware wrote…
Why reverse?
Because pop()
takes the last item in the vector. This is easier than poping from the front.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 402 at r5 (raw file):
Previously, matan-starkware wrote…
This is the only break point right? If so let's do
id = while let
above andbreak id;
here.
Break with output value only works in loop
.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 237 at r5 (raw file):
Previously, matan-starkware wrote…
Just change the return type of this
Done.
crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
line 69 at r5 (raw file):
/// 3. Once validation is complete, the manager returns the built proposal to the SHC as an /// event, which can be sent to the SM. ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalContentId)>),
Done.
crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs
line 79 at r5 (raw file):
PROPOSAL_INIT.clone(), content_receiver, /* Note: we are only sending the fin, there's no actual content. * mpsc::channel(1).1, // content - ignored by SHC. */
Done.
crates/sequencing/papyrus_consensus/src/types.rs
line 79 at r5 (raw file):
timeout: Duration, content: mpsc::Receiver<Self::ProposalPart>, ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>;
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 190 at r5 (raw file):
panic!("Did not receive a Fin message"); } }
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 201 at r5 (raw file):
panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}"); } }
Done.
582f17b
to
300148e
Compare
61f236f
to
4154e56
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 19 files at r8, 3 of 15 files at r9, 3 of 12 files at r10, 12 of 12 files at r12.
Reviewable status: 7 of 19 files reviewed, 15 unresolved discussions (waiting on @guy-starkware)
crates/sequencing/papyrus_consensus/src/manager.rs
line 147 at r5 (raw file):
Previously, guy-starkware wrote…
Do you want me to do anything in this PR? maybe update the TODO item?
Sure add a TODO to have a timeout and panic if not since that means streaming isn't working properly.
Can you just confirm for me that Streaming only sends this receiver once it has the first message in the stream, not just any message in the stream (out of order)?
crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs
line 34 at r12 (raw file):
static ref VALIDATE_PROPOSAL_EVENT: ShcEvent = ShcEvent::ValidateProposal( StateMachineEvent::Proposal(Some(BLOCK.id), PROPOSAL_INIT.round, PROPOSAL_INIT.valid_round,), Some(ProposalFin { proposal_content_id: BLOCK.id }),
can you define PROPOSAL_FIN in the statics section? (use Here and below)
Code quote:
ProposalFin { proposal_content_id: BLOCK.id })
crates/sequencing/papyrus_consensus/src/manager_test.rs
line 94 at r10 (raw file):
} async fn proposal_send(
Suggestion:
async fn send_proposal(
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 307 at r12 (raw file):
mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels;
Unused now I think. Remove here and throughout the file.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 337 at r12 (raw file):
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; content_sender.close_channel(); assert!(fin_receiver_past_round.await.is_err());
Suggestion:
// Proposal parts sent in the proposals.
let prop_part_txs = ProposalPart::Transactions(TransactionBatch {
transactions: TX_BATCH.clone().into_iter().map(Transaction::from).collect(),
tx_hashes: vec![TX_BATCH[0].tx_hash()],
});
let prop_part_fin = ProposalPart::Fin(ProposalFin {
proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0),
});
// The proposal from the past round is ignored.
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(prop_part_txs.clone()).await.unwrap();
let fin_receiver_past_round =
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
// Not fin sent and the channel remains open.
assert!(fin_receiver_past_round.await.is_err());
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 354 at r12 (raw file):
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await; content_sender.close_channel(); assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);
The need for the fin was caught by the loop break
change I suggest above.
Suggestion:
// The proposal from the current round should be validated.
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(prop_part_txs.clone()).await.unwrap();
content_sender.send(prop_part_fin.clone()).await.unwrap();
let fin_receiver_curr_round =
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 371 at r12 (raw file):
context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await; content_sender.close_channel(); assert!(fin_receiver_future_round.now_or_never().is_none());
Suggestion:
// The proposal from the future round should not be processed.
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(prop_part_txs).await.unwrap();
content_sender.send(prop_part_fin).await.unwrap();
let fin_receiver_future_round =
context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await;
content_sender.close_channel();
// Even with sending fin and closing the channel.
assert!(fin_receiver_future_round.now_or_never().is_none());
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 454 at r12 (raw file):
content_sender_1.close_channel(); // Move the context to the next round. context.set_height_and_round(BlockNumber(0), 1).await;
Suggestion:
let (mut content_sender_1, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender_1
.send(ProposalPart::Transactions(TransactionBatch {
transactions: TX_BATCH.clone().into_iter().map(Transaction::from).collect(),
tx_hashes: vec![TX_BATCH[0].tx_hash()],
}))
.await
.unwrap();
content_sender_1
.send(ProposalPart::Fin(ProposalFin {
proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0),
}))
.await
.unwrap();
let fin_receiver_1 =
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
// Move the context to the next round.
context.set_height_and_round(BlockNumber(0), 1).await;
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 198 at r12 (raw file):
// Some(ProposalPart::Init(_)) => { // panic!("Should not have ProposalInit at this point"); // }
Remove
Code quote:
// Some(ProposalPart::Init(_)) => {
// panic!("Should not have ProposalInit at this point");
// }
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 207 at r12 (raw file):
break fin.proposal_content_id; } mxs @ _ => panic!("Unexpected message: {mxs:?}"),
Suggestion:
msg @ _ => panic!("Unexpected message: {mxs:?}"),
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 242 at r12 (raw file):
// This can happen as a result of sync interrupting `run_height`. fin_sender .send((block_hash, ProposalFin { proposal_content_id: received_block_hash }))
Now that we have streaming I think I'd like to remove the ProposalInit and ProposalFin definitions in the consensus crate and use the one in the proto crate. Can you help me remember this?
Code quote:
ProposalFin { proposal_content_id: received_block_hash })
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 75 at r12 (raw file):
// Used to broadcast proposals to other consensus nodes. // TODO(Guy) switch to the actual streaming struct. _proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
Why not remove this?
Code quote:
// Used to broadcast proposals to other consensus nodes.
// TODO(Guy) switch to the actual streaming struct.
_proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 517 at r12 (raw file):
} } }
I realized we actually want to use a loop to enforce we are returning. With the while loop we were silently continuing if the receiver simply finished in the middle. Now we exit early.
Suggestion:
let network_block_id = loop {
let Some(prop_part) = content_receiver.next().await else {
warn!("Failed to receive proposal content: {proposal_id:?}");
return;
};
match prop_part {
ProposalPart::Transactions(TransactionBatch { transactions: txs, tx_hashes }) => {
let exe_txs: Vec<ExecutableTransaction> = txs
.into_iter()
.zip(tx_hashes.into_iter())
.map(|tx_tup| tx_tup.into())
.collect();
content.extend_from_slice(&exe_txs[..]);
let input = SendProposalContentInput {
proposal_id,
content: SendProposalContent::Txs(exe_txs),
};
let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| {
panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}")
});
match response.response {
ProposalStatus::Processing => {}
ProposalStatus::InvalidProposal => {
warn!("Proposal was invalid: {:?}", proposal_id);
return;
}
status @ _ => panic!("Unexpected status for {proposal_id:?}: {status:?}"),
}
}
ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => break id,
_ => panic!("Invalid proposal part: {:?}", prop_part),
}
};
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 534 at r12 (raw file):
warn!("Proposal was invalid: {:?}", proposal_id); return; }
Suggestion:
ProposalStatus::Finished(id) => id,
ProposalStatus::InvalidProposal => {
warn!("Proposal was invalid: {:?}", proposal_id);
return;
}
status @ _ => panic!("Unexpected status for {proposal_id:?}: {status:?}"),
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 549 at r12 (raw file):
// with `get_proposal` being called before `valid_proposals` is updated. let mut valid_proposals = valid_proposals.lock().unwrap(); valid_proposals.entry(height).or_default().insert(batcher_block_id, (content, proposal_id));
I initially planned to do this in consensus, but may be better here since we are adding this to our state.
Suggestion:
// Update valid_proposals before sending fin to avoid a race condition
// with `get_proposal` being called before `valid_proposals` is updated.
// TODO(Matan): Consider validating the ProposalFin signature here.
let mut valid_proposals = valid_proposals.lock().unwrap();
valid_proposals.entry(height).or_default().insert(batcher_block_id, (content, proposal_id));
4154e56
to
4a5deeb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 7 of 19 files reviewed, 15 unresolved discussions (waiting on @matan-starkware)
crates/sequencing/papyrus_consensus/src/manager.rs
line 147 at r5 (raw file):
Previously, matan-starkware wrote…
Sure add a TODO to have a timeout and panic if not since that means streaming isn't working properly.
Can you just confirm for me that Streaming only sends this receiver once it has the first message in the stream, not just any message in the stream (out of order)?
Added a TODO.
It must be a message with id=0, but it isn't enforced by StreamHandler that it must be an Init (the content is not exposed to StreamHandler).
crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs
line 34 at r12 (raw file):
Previously, matan-starkware wrote…
can you define PROPOSAL_FIN in the statics section? (use Here and below)
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 198 at r12 (raw file):
Previously, matan-starkware wrote…
Remove
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 242 at r12 (raw file):
Previously, matan-starkware wrote…
Now that we have streaming I think I'd like to remove the ProposalInit and ProposalFin definitions in the consensus crate and use the one in the proto crate. Can you help me remember this?
I'm not sure what you mean. As far as I know, there is only the protobuf crate definitions. Maybe I need to look more...
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 75 at r12 (raw file):
Previously, matan-starkware wrote…
Why not remove this?
The plan is to remove the old plumbing in another PR (same way as I added the plumbing with nothing attached).
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 517 at r12 (raw file):
Previously, matan-starkware wrote…
I realized we actually want to use a loop to enforce we are returning. With the while loop we were silently continuing if the receiver simply finished in the middle. Now we exit early.
Yeah, I was getting bad vibes from this loop, but wasn't sure what the expected behavior should be.
Let's try it this way.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 549 at r12 (raw file):
Previously, matan-starkware wrote…
I initially planned to do this in consensus, but may be better here since we are adding this to our state.
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 307 at r12 (raw file):
Previously, matan-starkware wrote…
Unused now I think. Remove here and throughout the file.
I think this will have to go on another PR when I take out these channels.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 354 at r12 (raw file):
Previously, matan-starkware wrote…
The need for the fin was caught by the
loop break
change I suggest above.
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 534 at r12 (raw file):
warn!("Proposal was invalid: {:?}", proposal_id); return; }
Done.
crates/sequencing/papyrus_consensus/src/manager_test.rs
line 94 at r10 (raw file):
} async fn proposal_send(
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 207 at r12 (raw file):
break fin.proposal_content_id; } mxs @ _ => panic!("Unexpected message: {mxs:?}"),
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 337 at r12 (raw file):
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; content_sender.close_channel(); assert!(fin_receiver_past_round.await.is_err());
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 371 at r12 (raw file):
context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await; content_sender.close_channel(); assert!(fin_receiver_future_round.now_or_never().is_none());
Done.
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 454 at r12 (raw file):
content_sender_1.close_channel(); // Move the context to the next round. context.set_height_and_round(BlockNumber(0), 1).await;
Done.
4a5deeb
to
568a63b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 12 of 12 files at r16, all commit messages.
Reviewable status: all files reviewed, 7 unresolved discussions (waiting on @guy-starkware)
crates/sequencing/papyrus_consensus/src/manager.rs
line 147 at r5 (raw file):
Previously, guy-starkware wrote…
Added a TODO.
It must be a message with id=0, but it isn't enforced by StreamHandler that it must be an Init (the content is not exposed to StreamHandler).
Great, that's my understanding.
crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs
line 34 at r12 (raw file):
Previously, guy-starkware wrote…
Done.
Can you remove the derive?
crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs
line 242 at r12 (raw file):
Previously, guy-starkware wrote…
I'm not sure what you mean. As far as I know, there is only the protobuf crate definitions. Maybe I need to look more...
I think you are correct and I misread my screen
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 534 at r12 (raw file):
Previously, guy-starkware wrote…
Done.
Good realization we don't need the name binding @
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 269 at r16 (raw file):
proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0), }); content_sender.send(prop_part).await.unwrap();
Inline, since only used once
Code quote:
let prop_part = ProposalPart::Fin(ProposalFin {
proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0),
});
content_sender.send(prop_part).await.unwrap();
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 363 at r16 (raw file):
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await; assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);
Suggestion:
let fin_receiver_curr_round =
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 480 at r16 (raw file):
let Some(prop_part) = content_receiver.next().await else { warn!("Failed to receive proposal content: {proposal_id:?}"); return;
Suggestion:
// TODO(Asmaa): Tell the batcher to abort.
warn!("Failed to receive proposal content: {proposal_id:?}");
return;
crates/sequencing/papyrus_consensus/src/manager.rs
line 146 at r16 (raw file):
// Get the first message to verify the init was sent. // TODO(guyn): add a timeout and panic if nothing comes from StreamHandler // (if it isn't sending things, that means something is wrong).
Suggestion:
// TODO(guyn): add a timeout and panic, since StreamHandler should only send once
// the first message (message_id=0) has arrived.
568a63b
to
dc17223
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @matan-starkware)
crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs
line 34 at r12 (raw file):
Previously, matan-starkware wrote…
Can you remove the derive?
Yes, it seems like I can. Weird, it didn't compile without it earlier...
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs
line 534 at r12 (raw file):
Previously, matan-starkware wrote…
Good realization we don't need the name binding
@
That's code style CI that caught it :)
crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs
line 363 at r16 (raw file):
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await; assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 12 of 12 files at r18, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @guy-starkware)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @guy-starkware)
This PR adds the ability of the Consensus/Context to accept streamed proposals and validate them.