Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid lost attestations and sync committee messages with early subnet subscribe #1298

Merged
merged 9 commits into from
Sep 23, 2024
17 changes: 13 additions & 4 deletions lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,29 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
Libp2pPort.publish(topic, message)
end

@spec subscribe(non_neg_integer()) :: :ok
def subscribe(subnet_id),
do: Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)

@spec collect(non_neg_integer(), Types.Attestation.t()) :: :ok
def collect(subnet_id, attestation) do
join(subnet_id)
AttSubnetInfo.new_subnet_with_attestation(subnet_id, attestation)
Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)
subscribe(subnet_id)
end

@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
@spec unsubscribe(non_neg_integer()) :: :ok
def unsubscribe(subnet_id) do
# TODO: (#1289) implement some way to unsubscribe without leaving the topic
topic = topic(subnet_id)
Libp2pPort.leave_topic(topic)
Libp2pPort.join_topic(topic)
end

@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
unsubscribe(subnet_id)
AttSubnetInfo.stop_collecting(subnet_id)
end

Expand Down
17 changes: 13 additions & 4 deletions lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,34 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.SyncCommittee do
Libp2pPort.publish(topic, message)
end

@spec subscribe(non_neg_integer()) :: :ok
def subscribe(subnet_id),
do: Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)

@spec collect([non_neg_integer()], Types.SyncCommitteeMessage.t()) :: :ok
def collect(subnet_ids, message) do
join(subnet_ids)

for subnet_id <- subnet_ids do
SyncSubnetInfo.new_subnet_with_message(subnet_id, message)
Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)
subscribe(subnet_id)
end

:ok
end

@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.SyncCommitteeMessage.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
@spec unsubscribe(non_neg_integer()) :: :ok
def unsubscribe(subnet_id) do
# TODO: (#1289) implement some way to unsubscribe without leaving the topic
topic = topic(subnet_id)
Libp2pPort.leave_topic(topic)
Libp2pPort.join_topic(topic)
end

@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.SyncCommitteeMessage.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
unsubscribe(subnet_id)
SyncSubnetInfo.stop_collecting(subnet_id)
end

Expand Down
88 changes: 77 additions & 11 deletions lib/lambda_ethereum_consensus/validator/duties.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,17 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
aggregation: [sync_committee_aggregator_duty()]
}

@typedoc "Set of subnet indices for a particular slot, used in attestations and sync committees."
@type subnets :: MapSet.t(subnet_index :: Types.uint64())

@typedoc "Useful precalculated data not tied to a particular slot/duty."
@type shared_data_for_duties :: %{sync_subcommittee_participants: %{}}
@type shared_data_for_duties :: %{
subnets: %{
attesters: %{Types.slot() => subnets},
sync_committees: %{Types.slot() => subnets}
},
sync_subcommittee_participants: %{}
}

@type attester_duties :: [attester_duty()]
@type proposer_duties :: [proposer_duty()]
Expand Down Expand Up @@ -103,11 +112,20 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
|> then(&{&1, compute_sync_subcommittee_participants(beacon, epoch)})
end

new_subnets_for_attestations = compute_subnets_for_attestations(new_attesters)
new_subnets_for_sync_committees = compute_subnets_for_sync_committees(new_sync_committees)

new_duties = %{
proposers: new_proposers,
attesters: new_attesters,
sync_committees: new_sync_committees,
shared: %{sync_subcommittee_participants: sync_subcommittee_participants}
shared: %{
subnets: %{
attesters: new_subnets_for_attestations,
sync_committees: new_subnets_for_sync_committees
},
sync_subcommittee_participants: sync_subcommittee_participants
}
}

log_duties_for_epoch(new_duties, epoch)
Expand Down Expand Up @@ -312,6 +330,25 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
Map.put(duty, :subnet_id, subnet_id)
end

defp compute_subnets_for_attestations(attester_duties) do
for {slot, duties} <- attester_duties,
%{should_aggregate?: true, committee_index: subnet_id} <- duties,
reduce: %{} do
acc ->
Map.update(acc, slot, MapSet.new([subnet_id]), &MapSet.put(&1, subnet_id))
end
end

defp compute_subnets_for_sync_committees(sync_committee_duties) do
for {slot, duties} <- sync_committee_duties,
%{aggregation: agg} when agg != [] <- duties,
%{subcommittee_index: subnet_id} <- agg,
reduce: %{} do
acc ->
Map.update(acc, slot, MapSet.new([subnet_id]), &MapSet.put(&1, subnet_id))
end
end

############################
# Accessors

Expand Down Expand Up @@ -350,6 +387,17 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
end
end

@spec current_subnets(duties(), Types.epoch(), Types.slot()) ::
%{attesters: subnets, sync_committees: subnets}
def current_subnets(duties, epoch, slot) do
%{
attesters: get_in(duties, [epoch, :shared, :subnets, :attesters, slot]) || MapSet.new(),
sync_committees:
get_in(duties, [epoch, :shared, :subnets, :sync_committees, max(0, slot - 1)]) ||
MapSet.new()
}
end

@spec sync_subcommittee_participants(duties(), Types.epoch()) :: %{
non_neg_integer() => [non_neg_integer()]
}
Expand Down Expand Up @@ -399,23 +447,41 @@ defmodule LambdaEthereumConsensus.Validator.Duties do

@spec log_duties_for_epoch(duties(), Types.epoch()) :: :ok
def log_duties_for_epoch(
%{proposers: proposers, attesters: attesters, sync_committees: sync_committees},
%{
proposers: proposers,
attesters: attesters,
sync_committees: sync_committees,
shared: shared
},
epoch
) do
Logger.info(
"[Duties] Proposers for epoch #{epoch} (slot=>validator):\n #{inspect(proposers)}"
)

for %{
subnet_ids: si,
validator_index: vi,
aggregation: agg
} <- sync_committees do
Logger.info(
"[Duties] Sync committee for epoch: #{epoch}, validator_index: #{vi} will broadcast on subnet_ids: #{inspect(si)}.\n Slots: #{inspect(agg |> Map.keys() |> Enum.join(", "))}"
)
Logger.debug(
"[Duties] SyncCommittees Subnets for epoch #{epoch}: #{inspect(shared.subnets.sync_committees)}"
)

for {slot, sync_duties} <- sync_committees,
length(sync_duties) > 0 do
Logger.debug("[Duties] Sync committee for epoch: #{epoch}, slot: #{slot}:")

for %{
validator_index: vi,
subnet_ids: si,
aggregation: agg
} <- sync_duties do
Logger.debug(
"[Duties] Validator: #{vi}, will broadcast in subnets: #{si} and aggregate in #{inspect(agg |> Enum.map(& &1.subcommittee_index))}."
)
end
end

Logger.debug(
"[Duties] Attesters Subnets for epoch #{epoch}: #{inspect(shared.subnets.attesters)}"
)

for {slot, att_duties} <- attesters,
length(att_duties) > 0 do
Logger.debug("[Duties] Attesters for epoch: #{epoch}, slot #{slot}:")
Expand Down
37 changes: 35 additions & 2 deletions lib/lambda_ethereum_consensus/validator/validator_set.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
simplify the delegation of work.
"""

defstruct slot: nil, head_root: nil, duties: %{}, validators: %{}
defstruct slot: nil,
head_root: nil,
duties: %{},
subscribed_subnets: %{attesters: MapSet.new(), sync_committees: MapSet.new()},
validators: %{}

require Logger

alias LambdaEthereumConsensus.P2P.Gossip.Attestation
alias LambdaEthereumConsensus.P2P.Gossip.SyncCommittee
alias LambdaEthereumConsensus.StateTransition
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Store.CheckpointStates
Expand All @@ -21,6 +27,7 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
slot: Types.slot(),
head_root: Types.root() | nil,
duties: %{Types.epoch() => Duties.duties()},
subscribed_subnets: %{attesters: Duties.subnets(), sync_committees: Duties.subnets()},
validators: validators()
}

Expand Down Expand Up @@ -137,7 +144,9 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
end

defp process_tick(%{head_root: head_root} = set, epoch, {slot, :first_third}) do
maybe_propose(set, epoch, slot, head_root)
set
|> maybe_resubscribe_to_subnets(epoch, slot)
|> maybe_propose(epoch, slot, head_root)
end

defp process_tick(%{head_root: head_root} = set, epoch, {slot, :second_third}) do
Expand Down Expand Up @@ -325,6 +334,30 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
|> then(&%{set | duties: &1})
end

##########################
# Subnets

defp maybe_resubscribe_to_subnets(set, epoch, slot) do
%{subscribed_subnets: %{attesters: old_att_subnets, sync_committees: old_sync_subnets}} = set

%{attesters: new_att_subnets, sync_committees: new_sync_subnets} =
Duties.current_subnets(set.duties, epoch, slot)

unsubscribe_att = MapSet.difference(old_att_subnets, new_att_subnets)
unsubscribe_sync = MapSet.difference(old_sync_subnets, new_sync_subnets)

Enum.each(unsubscribe_att, &Attestation.unsubscribe/1)
Enum.each(unsubscribe_sync, &SyncCommittee.unsubscribe/1)

subscribe_att = MapSet.difference(new_att_subnets, old_att_subnets)
subscribe_sync = MapSet.difference(new_sync_subnets, old_sync_subnets)

Enum.each(subscribe_att, &Attestation.subscribe/1)
Enum.each(subscribe_sync, &SyncCommittee.subscribe/1)

%{set | subscribed_subnets: %{attesters: new_att_subnets, sync_committees: new_sync_subnets}}
end

##########################
# Target State
# TODO: (#1278) This should be taken from the store as noted by arkenan.
Expand Down
16 changes: 5 additions & 11 deletions lib/types/att_subnet_info.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ defmodule Types.AttSubnetInfo do

@doc """
Adds a new Attestation to the SubnetInfo if the attestation's data matches the base one.
Assumes that the SubnetInfo already exists.
"""
@spec add_attestation!(non_neg_integer(), Types.Attestation.t()) :: :ok
def add_attestation!(subnet_id, attestation) do
subnet_info = fetch_subnet_info!(subnet_id)

if subnet_info.data == attestation.data do
def add_attestation!(subnet_id, %{data: att_data} = attestation) do
# TODO: (#1302) On delayed scenarios (past second third of the slot) we could discard useful
# messages and end up with empty aggregations due to the subnet not being created yet.
with {:ok, subnet_info} <- fetch_subnet_info(subnet_id),
^att_data <- subnet_info.data do
new_subnet_info = %__MODULE__{
subnet_info
| attestations: [attestation | subnet_info.attestations]
Expand Down Expand Up @@ -91,12 +91,6 @@ defmodule Types.AttSubnetInfo do
end
end

@spec fetch_subnet_info!(non_neg_integer()) :: t()
defp fetch_subnet_info!(subnet_id) do
{:ok, subnet_info} = fetch_subnet_info(subnet_id)
subnet_info
end

@spec delete_subnet(non_neg_integer()) :: :ok
defp delete_subnet(subnet_id), do: Db.delete(@subnet_prefix <> Integer.to_string(subnet_id))

Expand Down
19 changes: 6 additions & 13 deletions lib/types/sync_subnet_info.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@ defmodule Types.SyncSubnetInfo do

@doc """
Adds a new SyncCommitteeMessage to the SubnetInfo if the message's data matches the base one.
Assumes that the SubnetInfo already exists.
"""
@spec add_message!(non_neg_integer(), Types.SyncCommitteeMessage.t()) :: :ok
def add_message!(
subnet_id,
%Types.SyncCommitteeMessage{slot: slot, beacon_block_root: root} = message
) do
subnet_info = fetch_subnet_info!(subnet_id)
def add_message!(subnet_id, %Types.SyncCommitteeMessage{} = message) do
%{slot: slot, beacon_block_root: root} = message

if subnet_info.data == {slot, root} do
# TODO: (#1302) On delayed scenarios (past second third of the slot) we could discard useful
# messages and end up with empty aggregations due to the subnet not being created yet.
with {:ok, subnet_info} <- fetch_subnet_info(subnet_id),
{^slot, ^root} <- subnet_info.data do
new_subnet_info = %__MODULE__{
subnet_info
| messages: [message | subnet_info.messages]
Expand Down Expand Up @@ -100,12 +99,6 @@ defmodule Types.SyncSubnetInfo do
end
end

@spec fetch_subnet_info!(non_neg_integer()) :: t()
defp fetch_subnet_info!(subnet_id) do
{:ok, subnet_info} = fetch_subnet_info(subnet_id)
subnet_info
end

@spec delete_subnet(non_neg_integer()) :: :ok
defp delete_subnet(subnet_id), do: Db.delete(@subnet_prefix <> Integer.to_string(subnet_id))

Expand Down
Loading