Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sequencing): validate streamed proposals #2305

Merged
merged 2 commits into from
Dec 3, 2024

Conversation

guy-starkware
Copy link
Contributor

This PR adds the ability of the Consensus/Context to accept streamed proposals and validate them.

@reviewable-StarkWare
Copy link

This change is Reviewable

@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch 2 times, most recently from 18577f9 to 9cdaafe Compare November 27, 2024 15:28
Copy link

github-actions bot commented Nov 27, 2024

@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch from 9cdaafe to f14b4d1 Compare November 27, 2024 15:42
Copy link

codecov bot commented Nov 27, 2024

Codecov Report

Attention: Patch coverage is 77.39130% with 26 lines in your changes missing coverage. Please review.

Project coverage is 14.19%. Comparing base (e3165c4) to head (dc17223).
Report is 664 commits behind head on main.

Files with missing lines Patch % Lines
...us_orchestrator/src/sequencer_consensus_context.rs 72.54% 13 Missing and 1 partial ⚠️
crates/sequencing/papyrus_consensus/src/manager.rs 80.00% 4 Missing ⚠️
...g/papyrus_consensus/src/single_height_consensus.rs 80.00% 3 Missing and 1 partial ⚠️
...nsus_orchestrator/src/papyrus_consensus_context.rs 83.33% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##             main    #2305       +/-   ##
===========================================
- Coverage   40.10%   14.19%   -25.92%     
===========================================
  Files          26      224      +198     
  Lines        1895    30457    +28562     
  Branches     1895    30457    +28562     
===========================================
+ Hits          760     4322     +3562     
- Misses       1100    25767    +24667     
- Partials       35      368      +333     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch from f14b4d1 to f515d0c Compare November 28, 2024 09:22
@guy-starkware guy-starkware changed the base branch from main to guyn/streams/add_proposal_channel November 28, 2024 16:00
@guy-starkware guy-starkware force-pushed the guyn/streams/add_proposal_channel branch 4 times, most recently from 5f06879 to 4a79c7e Compare November 30, 2024 20:39
Copy link
Contributor

@matan-starkware matan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 11 of 11 files at r5, all commit messages.
Reviewable status: 11 of 19 files reviewed, 17 unresolved discussions (waiting on @guy-starkware)


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 237 at r5 (raw file):

    // Receive a valid proposal.
    let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
    let txs = vec![generate_invoke_tx(Felt::TWO)]

Just change the return type of this

Code quote:

generate_invoke_tx

crates/sequencing/papyrus_consensus/src/test_utils.rs line 33 at r5 (raw file):

    fn try_from(part: MockProposalPart) -> Result<Self, Self::Error> {
        Ok(ProposalInit {
            height: BlockNumber(part.0 as u64),

Or redefine the type as u8

Suggestion:

            height: BlockNumber(part.0)

crates/sequencing/papyrus_consensus/src/test_utils.rs line 129 at r5 (raw file):

pub fn proposal_fin(block_felt: Felt) -> ProposalFin {
    ProposalFin { proposal_content_id: BlockHash(block_felt) }
}

Not enough boilerplate saved to be worth it.


crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs line 73 at r5 (raw file):

    content_sender.send(MockProposalPart(1)).await.unwrap();
    // let (fin_sender, fin_receiver) = oneshot::channel();
    // fin_sender.send(BLOCK.id).unwrap();

should this be uncommented?

Code quote:

    // let (fin_sender, fin_receiver) = oneshot::channel();
    // fin_sender.send(BLOCK.id).unwrap();

crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs line 79 at r5 (raw file):

        PROPOSAL_INIT.clone(),
        content_receiver, /* Note: we are only sending the fin, there's no actual content.
                           * mpsc::channel(1).1, // content - ignored by SHC. */

Suggestion:

        content_receiver

crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 190 at r5 (raw file):

                            panic!("Did not receive a Fin message");
                        }
                    }

Suggestion:

                    match content.next().await {
                        Some(ProposalPart::Transactions(batch)) => {
                            for tx in batch.transactions {
                                content_transactions.push(tx);
                            }
                        }
                        Some(ProposalPart::Fin(fin)) => {
                            break fin.proposal_content_id;
                        }
                        msg @ _ => panic!("unexpected message: {msg:?}"),
                    }

crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 194 at r5 (raw file):

                // Check each transaction matches the transactions in the storage.
                for tx in transactions.iter().rev() {

Why reverse?

Code quote:

                for tx in transactions.iter().rev() {

crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 201 at r5 (raw file):

                        panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}");
                    }
                }

Suggestion:

                for tx in transactions.iter().rev() {
                    let received_tx = content_transactions
                        .pop()
                        .expect("Received less transactions than expected");
                    if tx != &received_tx {
                        panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}");
                    }
                }
                assert!(
                    content_transactions.is_empty(),
                    "Received more transactions than expected"
                );

crates/sequencing/papyrus_consensus/src/manager_test.rs line 106 at r5 (raw file):

    proposal_receiver_sender.send(proposal_receiver).await.unwrap();
    proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap();
    proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::ONE))).await.unwrap();

Can we move this implementation to when we actually support this? This test is ignored so this is useless here

Code quote:

    let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
    proposal_receiver_sender.send(proposal_receiver).await.unwrap();
    proposal_sender.send(ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))).await.unwrap();
    proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::TWO))).await.unwrap();
    send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
    send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;

    let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
    proposal_receiver_sender.send(proposal_receiver).await.unwrap();
    proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap();
    proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::ONE))).await.unwrap();

crates/sequencing/papyrus_consensus/src/manager_test.rs line 355 at r5 (raw file):

    let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
    proposal_receiver_sender.send(proposal_receiver).await.unwrap();
    proposal_sender.send(ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))).await.unwrap();

These 3 lnes repeat a lot, any helper function?

Code quote:

    let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
    proposal_receiver_sender.send(proposal_receiver).await.unwrap();
    proposal_sender.send(ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))).await.unwrap();

crates/sequencing/papyrus_consensus/src/types.rs line 79 at r5 (raw file):

        timeout: Duration,
        content: mpsc::Receiver<Self::ProposalPart>,
    ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>;

Suggestion:

    ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 402 at r5 (raw file):

                network_block_id = id; // Output this along with the ID from batcher, to compare them. 
                break;
            }

This is the only break point right? If so let's do id = while let above and break id; here.

Code quote:

            ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => {
                network_block_id = id; // Output this along with the ID from batcher, to compare them.
                break;
            }

crates/sequencing/papyrus_consensus/src/manager.rs line 45 at r5 (raw file):

where
    ContextT: ConsensusContext,
    <ContextT as ConsensusContext>::ProposalPart: std::fmt::Debug,

Want to just put this inside of types.rs?

Code quote:

std::fmt::Debug

crates/sequencing/papyrus_consensus/src/manager.rs line 147 at r5 (raw file):

                Some(mut content_receiver) = proposal_receiver.next() => {
                    // Get the first message to verify the init was sent.
                    // TODO(guyn): what happens if the channel never sends anything?

Add a timeout (another PR).

IIUC, the streaming component, should only send us this content_receiver once it already has the first message of the stream (message_id=0). This means there is an error in the Streaming component and we should panic.

Code quote:

                    // TODO(guyn): what happens if the channel never sends anything?

crates/sequencing/papyrus_consensus/src/manager.rs line 185 at r5 (raw file):

        <ContextT as ConsensusContext>::ProposalPart: std::fmt::Debug,
    {
        // TODO(guyn): what is the right thing to do if proposal's height doesn't match?

You wrote the answer below, cache future heights.

Drop old heights

Code quote:

        // TODO(guyn): what is the right thing to do if proposal's height doesn't match?

crates/sequencing/papyrus_consensus/src/single_height_consensus.rs line 69 at r5 (raw file):

    /// 3. Once validation is complete, the manager returns the built proposal to the SHC as an
    ///    event, which can be sent to the SM.
    ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalContentId)>),

Suggestion:

    ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalFin)>),

crates/sequencing/papyrus_consensus/src/single_height_consensus.rs line 116 at r5 (raw file):

            }
            ShcTask::ValidateProposal(init, block_receiver) => {
                let (block_proposal_id, network_proposal_id) = match block_receiver.await {

Rename throughtout. I want the names to hightlight that one of these is built locally from the content and the other is sent by the receiver (eventually it will be a signature)

Suggestion:

                let (built_content_id, received_proposal_fin) = match block_receiver.await {

@guy-starkware guy-starkware force-pushed the guyn/streams/add_proposal_channel branch 3 times, most recently from dbcada6 to bd0fbd4 Compare December 1, 2024 13:40
@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch 2 times, most recently from 6280ae8 to 0018cbc Compare December 1, 2024 16:35
@guy-starkware guy-starkware changed the title feat(consensus): validate streamed proposals feat(sequencing): validate streamed proposals Dec 2, 2024
@guy-starkware guy-starkware force-pushed the guyn/streams/add_proposal_channel branch 3 times, most recently from 2525648 to 871a959 Compare December 2, 2024 10:08
@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch from 0018cbc to 89628e8 Compare December 2, 2024 11:40
@guy-starkware guy-starkware force-pushed the guyn/streams/add_proposal_channel branch 2 times, most recently from 14b88ce to 6036394 Compare December 2, 2024 12:05
@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch 2 times, most recently from abab998 to 582f17b Compare December 2, 2024 14:41
Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 20 files reviewed, 17 unresolved discussions (waiting on @matan-starkware)


crates/sequencing/papyrus_consensus/src/manager.rs line 45 at r5 (raw file):

Previously, matan-starkware wrote…

Want to just put this inside of types.rs?

Good idea!


crates/sequencing/papyrus_consensus/src/manager.rs line 147 at r5 (raw file):

Previously, matan-starkware wrote…

Add a timeout (another PR).

IIUC, the streaming component, should only send us this content_receiver once it already has the first message of the stream (message_id=0). This means there is an error in the Streaming component and we should panic.

Do you want me to do anything in this PR? maybe update the TODO item?


crates/sequencing/papyrus_consensus/src/manager.rs line 185 at r5 (raw file):

Previously, matan-starkware wrote…

You wrote the answer below, cache future heights.

Drop old heights

Sorry, yeah I actually did this in the next PR, this is just a reminder for me.


crates/sequencing/papyrus_consensus/src/manager_test.rs line 106 at r5 (raw file):

Previously, matan-starkware wrote…

Can we move this implementation to when we actually support this? This test is ignored so this is useless here

Yeah, we can leave this to the next PR. Removing for now.


crates/sequencing/papyrus_consensus/src/manager_test.rs line 355 at r5 (raw file):

Previously, matan-starkware wrote…

These 3 lnes repeat a lot, any helper function?

Good idea.


crates/sequencing/papyrus_consensus/src/single_height_consensus.rs line 116 at r5 (raw file):

Previously, matan-starkware wrote…

Rename throughtout. I want the names to hightlight that one of these is built locally from the content and the other is sent by the receiver (eventually it will be a signature)

Done.


crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs line 73 at r5 (raw file):

Previously, matan-starkware wrote…

should this be uncommented?

No, these are not used anymore. Removing.


crates/sequencing/papyrus_consensus/src/test_utils.rs line 33 at r5 (raw file):

Previously, matan-starkware wrote…

Or redefine the type as u8

Actually I think I've changed it to be u64 everywhere.


crates/sequencing/papyrus_consensus/src/test_utils.rs line 129 at r5 (raw file):

Previously, matan-starkware wrote…

Not enough boilerplate saved to be worth it.

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 194 at r5 (raw file):

Previously, matan-starkware wrote…

Why reverse?

Because pop() takes the last item in the vector. This is easier than poping from the front.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 402 at r5 (raw file):

Previously, matan-starkware wrote…

This is the only break point right? If so let's do id = while let above and break id; here.

Break with output value only works in loop.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 237 at r5 (raw file):

Previously, matan-starkware wrote…

Just change the return type of this

Done.


crates/sequencing/papyrus_consensus/src/single_height_consensus.rs line 69 at r5 (raw file):

    /// 3. Once validation is complete, the manager returns the built proposal to the SHC as an
    ///    event, which can be sent to the SM.
    ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalContentId)>),

Done.


crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs line 79 at r5 (raw file):

        PROPOSAL_INIT.clone(),
        content_receiver, /* Note: we are only sending the fin, there's no actual content.
                           * mpsc::channel(1).1, // content - ignored by SHC. */

Done.


crates/sequencing/papyrus_consensus/src/types.rs line 79 at r5 (raw file):

        timeout: Duration,
        content: mpsc::Receiver<Self::ProposalPart>,
    ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>;

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 190 at r5 (raw file):

                            panic!("Did not receive a Fin message");
                        }
                    }

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 201 at r5 (raw file):

                        panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}");
                    }
                }

Done.

@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch from 582f17b to 300148e Compare December 2, 2024 15:08
@guy-starkware guy-starkware changed the base branch from guyn/streams/add_proposal_channel to main December 2, 2024 15:09
@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch 3 times, most recently from 61f236f to 4154e56 Compare December 2, 2024 16:23
Copy link
Contributor

@matan-starkware matan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 4 of 19 files at r8, 3 of 15 files at r9, 3 of 12 files at r10, 12 of 12 files at r12.
Reviewable status: 7 of 19 files reviewed, 15 unresolved discussions (waiting on @guy-starkware)


crates/sequencing/papyrus_consensus/src/manager.rs line 147 at r5 (raw file):

Previously, guy-starkware wrote…

Do you want me to do anything in this PR? maybe update the TODO item?

Sure add a TODO to have a timeout and panic if not since that means streaming isn't working properly.

Can you just confirm for me that Streaming only sends this receiver once it has the first message in the stream, not just any message in the stream (out of order)?


crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs line 34 at r12 (raw file):

    static ref VALIDATE_PROPOSAL_EVENT: ShcEvent = ShcEvent::ValidateProposal(
        StateMachineEvent::Proposal(Some(BLOCK.id), PROPOSAL_INIT.round, PROPOSAL_INIT.valid_round,),
        Some(ProposalFin { proposal_content_id: BLOCK.id }),

can you define PROPOSAL_FIN in the statics section? (use Here and below)

Code quote:

ProposalFin { proposal_content_id: BLOCK.id })

crates/sequencing/papyrus_consensus/src/manager_test.rs line 94 at r10 (raw file):

}

async fn proposal_send(

Suggestion:

async fn send_proposal(

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 307 at r12 (raw file):

        mock_register_broadcast_topic().expect("Failed to create mock network");
    let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
        subscriber_channels;

Unused now I think. Remove here and throughout the file.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 337 at r12 (raw file):

        context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
    content_sender.close_channel();
    assert!(fin_receiver_past_round.await.is_err());

Suggestion:

    // Proposal parts sent in the proposals.
    let prop_part_txs = ProposalPart::Transactions(TransactionBatch {
        transactions: TX_BATCH.clone().into_iter().map(Transaction::from).collect(),
        tx_hashes: vec![TX_BATCH[0].tx_hash()],
    });
    let prop_part_fin = ProposalPart::Fin(ProposalFin {
        proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0),
    });

    // The proposal from the past round is ignored.
    let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
    content_sender.send(prop_part_txs.clone()).await.unwrap();
    let fin_receiver_past_round =
        context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
    // Not fin sent and the channel remains open.
    assert!(fin_receiver_past_round.await.is_err());

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 354 at r12 (raw file):

        context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
    content_sender.close_channel();
    assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);

The need for the fin was caught by the loop break change I suggest above.

Suggestion:

    // The proposal from the current round should be validated.
    let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
    content_sender.send(prop_part_txs.clone()).await.unwrap();
    content_sender.send(prop_part_fin.clone()).await.unwrap();
    let fin_receiver_curr_round =
        context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
    assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 371 at r12 (raw file):

        context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await;
    content_sender.close_channel();
    assert!(fin_receiver_future_round.now_or_never().is_none());

Suggestion:

    // The proposal from the future round should not be processed.
    let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
    content_sender.send(prop_part_txs).await.unwrap();
    content_sender.send(prop_part_fin).await.unwrap();
    let fin_receiver_future_round =
        context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await;
    content_sender.close_channel();
    // Even with sending fin and closing the channel.
    assert!(fin_receiver_future_round.now_or_never().is_none());

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 454 at r12 (raw file):

    content_sender_1.close_channel();
    // Move the context to the next round.
    context.set_height_and_round(BlockNumber(0), 1).await;

Suggestion:

    let (mut content_sender_1, content_receiver) = mpsc::channel(CHANNEL_SIZE);
    content_sender_1
        .send(ProposalPart::Transactions(TransactionBatch {
            transactions: TX_BATCH.clone().into_iter().map(Transaction::from).collect(),
            tx_hashes: vec![TX_BATCH[0].tx_hash()],
        }))
        .await
        .unwrap();
    content_sender_1
        .send(ProposalPart::Fin(ProposalFin {
            proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0),
        }))
        .await
        .unwrap();
    let fin_receiver_1 =
        context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
    // Move the context to the next round.
    context.set_height_and_round(BlockNumber(0), 1).await;

crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 198 at r12 (raw file):

                        // Some(ProposalPart::Init(_)) => {
                        //     panic!("Should not have ProposalInit at this point");
                        // }

Remove

Code quote:

                        // Some(ProposalPart::Init(_)) => {
                        //     panic!("Should not have ProposalInit at this point");
                        // }

crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 207 at r12 (raw file):

                            break fin.proposal_content_id;
                        }
                        mxs @ _ => panic!("Unexpected message: {mxs:?}"),

Suggestion:

                        msg @ _ => panic!("Unexpected message: {mxs:?}"),

crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 242 at r12 (raw file):

                // This can happen as a result of sync interrupting `run_height`.
                fin_sender
                    .send((block_hash, ProposalFin { proposal_content_id: received_block_hash }))

Now that we have streaming I think I'd like to remove the ProposalInit and ProposalFin definitions in the consensus crate and use the one in the proto crate. Can you help me remember this?

Code quote:

ProposalFin { proposal_content_id: received_block_hash })

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 75 at r12 (raw file):

    // Used to broadcast proposals to other consensus nodes.
    // TODO(Guy) switch to the actual streaming struct.
    _proposal_streaming_client: BroadcastTopicClient<ProposalPart>,

Why not remove this?

Code quote:

    // Used to broadcast proposals to other consensus nodes.
    // TODO(Guy) switch to the actual streaming struct.
    _proposal_streaming_client: BroadcastTopicClient<ProposalPart>,

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 517 at r12 (raw file):

            }
        }
    }

I realized we actually want to use a loop to enforce we are returning. With the while loop we were silently continuing if the receiver simply finished in the middle. Now we exit early.

Suggestion:

    let network_block_id = loop {
        let Some(prop_part) = content_receiver.next().await else {
            warn!("Failed to receive proposal content: {proposal_id:?}");
            return;
        };
        match prop_part {
            ProposalPart::Transactions(TransactionBatch { transactions: txs, tx_hashes }) => {
                let exe_txs: Vec<ExecutableTransaction> = txs
                    .into_iter()
                    .zip(tx_hashes.into_iter())
                    .map(|tx_tup| tx_tup.into())
                    .collect();
                content.extend_from_slice(&exe_txs[..]);
                let input = SendProposalContentInput {
                    proposal_id,
                    content: SendProposalContent::Txs(exe_txs),
                };
                let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| {
                    panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}")
                });
                match response.response {
                    ProposalStatus::Processing => {}
                    ProposalStatus::InvalidProposal => {
                        warn!("Proposal was invalid: {:?}", proposal_id);
                        return;
                    }
                    status @ _ => panic!("Unexpected status for {proposal_id:?}: {status:?}"),
                }
            }
            ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => break id,
            _ => panic!("Invalid proposal part: {:?}", prop_part),
        }
    };

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 534 at r12 (raw file):

            warn!("Proposal was invalid: {:?}", proposal_id);
            return;
        }

Suggestion:

        ProposalStatus::Finished(id) => id,
        ProposalStatus::InvalidProposal => {
            warn!("Proposal was invalid: {:?}", proposal_id);
            return;
        }
        status @ _ => panic!("Unexpected status for {proposal_id:?}: {status:?}"),

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 549 at r12 (raw file):

    // with `get_proposal` being called before `valid_proposals` is updated.
    let mut valid_proposals = valid_proposals.lock().unwrap();
    valid_proposals.entry(height).or_default().insert(batcher_block_id, (content, proposal_id));

I initially planned to do this in consensus, but may be better here since we are adding this to our state.

Suggestion:

    // Update valid_proposals before sending fin to avoid a race condition
    // with `get_proposal` being called before `valid_proposals` is updated.
    // TODO(Matan): Consider validating the ProposalFin signature here.
    let mut valid_proposals = valid_proposals.lock().unwrap();
    valid_proposals.entry(height).or_default().insert(batcher_block_id, (content, proposal_id));

@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch from 4154e56 to 4a5deeb Compare December 2, 2024 22:10
Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 7 of 19 files reviewed, 15 unresolved discussions (waiting on @matan-starkware)


crates/sequencing/papyrus_consensus/src/manager.rs line 147 at r5 (raw file):

Previously, matan-starkware wrote…

Sure add a TODO to have a timeout and panic if not since that means streaming isn't working properly.

Can you just confirm for me that Streaming only sends this receiver once it has the first message in the stream, not just any message in the stream (out of order)?

Added a TODO.

It must be a message with id=0, but it isn't enforced by StreamHandler that it must be an Init (the content is not exposed to StreamHandler).


crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs line 34 at r12 (raw file):

Previously, matan-starkware wrote…

can you define PROPOSAL_FIN in the statics section? (use Here and below)

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 198 at r12 (raw file):

Previously, matan-starkware wrote…

Remove

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 242 at r12 (raw file):

Previously, matan-starkware wrote…

Now that we have streaming I think I'd like to remove the ProposalInit and ProposalFin definitions in the consensus crate and use the one in the proto crate. Can you help me remember this?

I'm not sure what you mean. As far as I know, there is only the protobuf crate definitions. Maybe I need to look more...


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 75 at r12 (raw file):

Previously, matan-starkware wrote…

Why not remove this?

The plan is to remove the old plumbing in another PR (same way as I added the plumbing with nothing attached).


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 517 at r12 (raw file):

Previously, matan-starkware wrote…

I realized we actually want to use a loop to enforce we are returning. With the while loop we were silently continuing if the receiver simply finished in the middle. Now we exit early.

Yeah, I was getting bad vibes from this loop, but wasn't sure what the expected behavior should be.
Let's try it this way.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 549 at r12 (raw file):

Previously, matan-starkware wrote…

I initially planned to do this in consensus, but may be better here since we are adding this to our state.

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 307 at r12 (raw file):

Previously, matan-starkware wrote…

Unused now I think. Remove here and throughout the file.

I think this will have to go on another PR when I take out these channels.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 354 at r12 (raw file):

Previously, matan-starkware wrote…

The need for the fin was caught by the loop break change I suggest above.

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 534 at r12 (raw file):

            warn!("Proposal was invalid: {:?}", proposal_id);
            return;
        }

Done.


crates/sequencing/papyrus_consensus/src/manager_test.rs line 94 at r10 (raw file):

}

async fn proposal_send(

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 207 at r12 (raw file):

                            break fin.proposal_content_id;
                        }
                        mxs @ _ => panic!("Unexpected message: {mxs:?}"),

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 337 at r12 (raw file):

        context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
    content_sender.close_channel();
    assert!(fin_receiver_past_round.await.is_err());

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 371 at r12 (raw file):

        context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await;
    content_sender.close_channel();
    assert!(fin_receiver_future_round.now_or_never().is_none());

Done.


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 454 at r12 (raw file):

    content_sender_1.close_channel();
    // Move the context to the next round.
    context.set_height_and_round(BlockNumber(0), 1).await;

Done.

@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch from 4a5deeb to 568a63b Compare December 3, 2024 07:25
@matan-starkware matan-starkware self-requested a review December 3, 2024 07:45
Copy link
Contributor

@matan-starkware matan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 12 of 12 files at r16, all commit messages.
Reviewable status: all files reviewed, 7 unresolved discussions (waiting on @guy-starkware)


crates/sequencing/papyrus_consensus/src/manager.rs line 147 at r5 (raw file):

Previously, guy-starkware wrote…

Added a TODO.

It must be a message with id=0, but it isn't enforced by StreamHandler that it must be an Init (the content is not exposed to StreamHandler).

Great, that's my understanding.


crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs line 34 at r12 (raw file):

Previously, guy-starkware wrote…

Done.

Can you remove the derive?


crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs line 242 at r12 (raw file):

Previously, guy-starkware wrote…

I'm not sure what you mean. As far as I know, there is only the protobuf crate definitions. Maybe I need to look more...

I think you are correct and I misread my screen


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 534 at r12 (raw file):

Previously, guy-starkware wrote…

Done.

Good realization we don't need the name binding @


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 269 at r16 (raw file):

        proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0),
    });
    content_sender.send(prop_part).await.unwrap();

Inline, since only used once

Code quote:

    let prop_part = ProposalPart::Fin(ProposalFin {
        proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0),
    });
    content_sender.send(prop_part).await.unwrap();

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 363 at r16 (raw file):

        context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;

    assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);

Suggestion:

    let fin_receiver_curr_round =
        context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
    assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);

crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 480 at r16 (raw file):

        let Some(prop_part) = content_receiver.next().await else {
            warn!("Failed to receive proposal content: {proposal_id:?}");
            return;

Suggestion:

            // TODO(Asmaa): Tell the batcher to abort.
            warn!("Failed to receive proposal content: {proposal_id:?}");
            return;

crates/sequencing/papyrus_consensus/src/manager.rs line 146 at r16 (raw file):

                    // Get the first message to verify the init was sent.
                    // TODO(guyn): add a timeout and panic if nothing comes from StreamHandler
                    // (if it isn't sending things, that means something is wrong).

Suggestion:

                    // TODO(guyn): add a timeout and panic, since StreamHandler should only send once
                    // the first message (message_id=0) has arrived.

@guy-starkware guy-starkware force-pushed the guyn/streams/validate_streams branch from 568a63b to dc17223 Compare December 3, 2024 08:16
Copy link
Contributor Author

@guy-starkware guy-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @matan-starkware)


crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs line 34 at r12 (raw file):

Previously, matan-starkware wrote…

Can you remove the derive?

Yes, it seems like I can. Weird, it didn't compile without it earlier...


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs line 534 at r12 (raw file):

Previously, matan-starkware wrote…

Good realization we don't need the name binding @

That's code style CI that caught it :)


crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs line 363 at r16 (raw file):

        context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;

    assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);

Done.

@matan-starkware matan-starkware self-requested a review December 3, 2024 09:27
Copy link
Contributor

@matan-starkware matan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 12 of 12 files at r18, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @guy-starkware)

Copy link
Contributor

@matan-starkware matan-starkware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @guy-starkware)

@guy-starkware guy-starkware merged commit a6a227b into main Dec 3, 2024
16 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Dec 5, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants