diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex b/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex index 223db2c07..110dd57f0 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex @@ -67,20 +67,29 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do Libp2pPort.publish(topic, message) end + @spec subscribe(non_neg_integer()) :: :ok + def subscribe(subnet_id), + do: Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__) + @spec collect(non_neg_integer(), Types.Attestation.t()) :: :ok def collect(subnet_id, attestation) do join(subnet_id) AttSubnetInfo.new_subnet_with_attestation(subnet_id, attestation) - Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__) + subscribe(subnet_id) end - @spec stop_collecting(non_neg_integer()) :: - {:ok, list(Types.Attestation.t())} | {:error, String.t()} - def stop_collecting(subnet_id) do + @spec unsubscribe(non_neg_integer()) :: :ok + def unsubscribe(subnet_id) do # TODO: (#1289) implement some way to unsubscribe without leaving the topic topic = topic(subnet_id) Libp2pPort.leave_topic(topic) Libp2pPort.join_topic(topic) + end + + @spec stop_collecting(non_neg_integer()) :: + {:ok, list(Types.Attestation.t())} | {:error, String.t()} + def stop_collecting(subnet_id) do + unsubscribe(subnet_id) AttSubnetInfo.stop_collecting(subnet_id) end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex b/lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex index 5e6e5c15b..fb5ff6fba 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex @@ -72,25 +72,34 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.SyncCommittee do Libp2pPort.publish(topic, message) end + @spec subscribe(non_neg_integer()) :: :ok + def subscribe(subnet_id), + do: Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__) + @spec collect([non_neg_integer()], Types.SyncCommitteeMessage.t()) :: :ok def collect(subnet_ids, message) do join(subnet_ids) for subnet_id <- subnet_ids do SyncSubnetInfo.new_subnet_with_message(subnet_id, message) - Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__) + subscribe(subnet_id) end :ok end - @spec stop_collecting(non_neg_integer()) :: - {:ok, list(Types.SyncCommitteeMessage.t())} | {:error, String.t()} - def stop_collecting(subnet_id) do + @spec unsubscribe(non_neg_integer()) :: :ok + def unsubscribe(subnet_id) do # TODO: (#1289) implement some way to unsubscribe without leaving the topic topic = topic(subnet_id) Libp2pPort.leave_topic(topic) Libp2pPort.join_topic(topic) + end + + @spec stop_collecting(non_neg_integer()) :: + {:ok, list(Types.SyncCommitteeMessage.t())} | {:error, String.t()} + def stop_collecting(subnet_id) do + unsubscribe(subnet_id) SyncSubnetInfo.stop_collecting(subnet_id) end diff --git a/lib/lambda_ethereum_consensus/validator/duties.ex b/lib/lambda_ethereum_consensus/validator/duties.ex index 8867ccf97..9b0d3ec3a 100644 --- a/lib/lambda_ethereum_consensus/validator/duties.ex +++ b/lib/lambda_ethereum_consensus/validator/duties.ex @@ -41,8 +41,17 @@ defmodule LambdaEthereumConsensus.Validator.Duties do aggregation: [sync_committee_aggregator_duty()] } + @typedoc "Set of subnet indices for a particular slot, used in attestations and sync committees." + @type subnets :: MapSet.t(subnet_index :: Types.uint64()) + @typedoc "Useful precalculated data not tied to a particular slot/duty." - @type shared_data_for_duties :: %{sync_subcommittee_participants: %{}} + @type shared_data_for_duties :: %{ + subnets: %{ + attesters: %{Types.slot() => subnets}, + sync_committees: %{Types.slot() => subnets} + }, + sync_subcommittee_participants: %{} + } @type attester_duties :: [attester_duty()] @type proposer_duties :: [proposer_duty()] @@ -103,11 +112,20 @@ defmodule LambdaEthereumConsensus.Validator.Duties do |> then(&{&1, compute_sync_subcommittee_participants(beacon, epoch)}) end + new_subnets_for_attestations = compute_subnets_for_attestations(new_attesters) + new_subnets_for_sync_committees = compute_subnets_for_sync_committees(new_sync_committees) + new_duties = %{ proposers: new_proposers, attesters: new_attesters, sync_committees: new_sync_committees, - shared: %{sync_subcommittee_participants: sync_subcommittee_participants} + shared: %{ + subnets: %{ + attesters: new_subnets_for_attestations, + sync_committees: new_subnets_for_sync_committees + }, + sync_subcommittee_participants: sync_subcommittee_participants + } } log_duties_for_epoch(new_duties, epoch) @@ -312,6 +330,25 @@ defmodule LambdaEthereumConsensus.Validator.Duties do Map.put(duty, :subnet_id, subnet_id) end + defp compute_subnets_for_attestations(attester_duties) do + for {slot, duties} <- attester_duties, + %{should_aggregate?: true, committee_index: subnet_id} <- duties, + reduce: %{} do + acc -> + Map.update(acc, slot, MapSet.new([subnet_id]), &MapSet.put(&1, subnet_id)) + end + end + + defp compute_subnets_for_sync_committees(sync_committee_duties) do + for {slot, duties} <- sync_committee_duties, + %{aggregation: agg} when agg != [] <- duties, + %{subcommittee_index: subnet_id} <- agg, + reduce: %{} do + acc -> + Map.update(acc, slot, MapSet.new([subnet_id]), &MapSet.put(&1, subnet_id)) + end + end + ############################ # Accessors @@ -350,6 +387,17 @@ defmodule LambdaEthereumConsensus.Validator.Duties do end end + @spec current_subnets(duties(), Types.epoch(), Types.slot()) :: + %{attesters: subnets, sync_committees: subnets} + def current_subnets(duties, epoch, slot) do + %{ + attesters: get_in(duties, [epoch, :shared, :subnets, :attesters, slot]) || MapSet.new(), + sync_committees: + get_in(duties, [epoch, :shared, :subnets, :sync_committees, max(0, slot - 1)]) || + MapSet.new() + } + end + @spec sync_subcommittee_participants(duties(), Types.epoch()) :: %{ non_neg_integer() => [non_neg_integer()] } @@ -399,23 +447,41 @@ defmodule LambdaEthereumConsensus.Validator.Duties do @spec log_duties_for_epoch(duties(), Types.epoch()) :: :ok def log_duties_for_epoch( - %{proposers: proposers, attesters: attesters, sync_committees: sync_committees}, + %{ + proposers: proposers, + attesters: attesters, + sync_committees: sync_committees, + shared: shared + }, epoch ) do Logger.info( "[Duties] Proposers for epoch #{epoch} (slot=>validator):\n #{inspect(proposers)}" ) - for %{ - subnet_ids: si, - validator_index: vi, - aggregation: agg - } <- sync_committees do - Logger.info( - "[Duties] Sync committee for epoch: #{epoch}, validator_index: #{vi} will broadcast on subnet_ids: #{inspect(si)}.\n Slots: #{inspect(agg |> Map.keys() |> Enum.join(", "))}" - ) + Logger.debug( + "[Duties] SyncCommittees Subnets for epoch #{epoch}: #{inspect(shared.subnets.sync_committees)}" + ) + + for {slot, sync_duties} <- sync_committees, + length(sync_duties) > 0 do + Logger.debug("[Duties] Sync committee for epoch: #{epoch}, slot: #{slot}:") + + for %{ + validator_index: vi, + subnet_ids: si, + aggregation: agg + } <- sync_duties do + Logger.debug( + "[Duties] Validator: #{vi}, will broadcast in subnets: #{si} and aggregate in #{inspect(agg |> Enum.map(& &1.subcommittee_index))}." + ) + end end + Logger.debug( + "[Duties] Attesters Subnets for epoch #{epoch}: #{inspect(shared.subnets.attesters)}" + ) + for {slot, att_duties} <- attesters, length(att_duties) > 0 do Logger.debug("[Duties] Attesters for epoch: #{epoch}, slot #{slot}:") diff --git a/lib/lambda_ethereum_consensus/validator/validator_set.ex b/lib/lambda_ethereum_consensus/validator/validator_set.ex index 5ae727e35..05f420928 100644 --- a/lib/lambda_ethereum_consensus/validator/validator_set.ex +++ b/lib/lambda_ethereum_consensus/validator/validator_set.ex @@ -5,10 +5,16 @@ defmodule LambdaEthereumConsensus.ValidatorSet do simplify the delegation of work. """ - defstruct slot: nil, head_root: nil, duties: %{}, validators: %{} + defstruct slot: nil, + head_root: nil, + duties: %{}, + subscribed_subnets: %{attesters: MapSet.new(), sync_committees: MapSet.new()}, + validators: %{} require Logger + alias LambdaEthereumConsensus.P2P.Gossip.Attestation + alias LambdaEthereumConsensus.P2P.Gossip.SyncCommittee alias LambdaEthereumConsensus.StateTransition alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Store.CheckpointStates @@ -21,6 +27,7 @@ defmodule LambdaEthereumConsensus.ValidatorSet do slot: Types.slot(), head_root: Types.root() | nil, duties: %{Types.epoch() => Duties.duties()}, + subscribed_subnets: %{attesters: Duties.subnets(), sync_committees: Duties.subnets()}, validators: validators() } @@ -137,7 +144,9 @@ defmodule LambdaEthereumConsensus.ValidatorSet do end defp process_tick(%{head_root: head_root} = set, epoch, {slot, :first_third}) do - maybe_propose(set, epoch, slot, head_root) + set + |> maybe_resubscribe_to_subnets(epoch, slot) + |> maybe_propose(epoch, slot, head_root) end defp process_tick(%{head_root: head_root} = set, epoch, {slot, :second_third}) do @@ -325,6 +334,30 @@ defmodule LambdaEthereumConsensus.ValidatorSet do |> then(&%{set | duties: &1}) end + ########################## + # Subnets + + defp maybe_resubscribe_to_subnets(set, epoch, slot) do + %{subscribed_subnets: %{attesters: old_att_subnets, sync_committees: old_sync_subnets}} = set + + %{attesters: new_att_subnets, sync_committees: new_sync_subnets} = + Duties.current_subnets(set.duties, epoch, slot) + + unsubscribe_att = MapSet.difference(old_att_subnets, new_att_subnets) + unsubscribe_sync = MapSet.difference(old_sync_subnets, new_sync_subnets) + + Enum.each(unsubscribe_att, &Attestation.unsubscribe/1) + Enum.each(unsubscribe_sync, &SyncCommittee.unsubscribe/1) + + subscribe_att = MapSet.difference(new_att_subnets, old_att_subnets) + subscribe_sync = MapSet.difference(new_sync_subnets, old_sync_subnets) + + Enum.each(subscribe_att, &Attestation.subscribe/1) + Enum.each(subscribe_sync, &SyncCommittee.subscribe/1) + + %{set | subscribed_subnets: %{attesters: new_att_subnets, sync_committees: new_sync_subnets}} + end + ########################## # Target State # TODO: (#1278) This should be taken from the store as noted by arkenan. diff --git a/lib/types/att_subnet_info.ex b/lib/types/att_subnet_info.ex index 6f0c2594b..b909e8e8e 100644 --- a/lib/types/att_subnet_info.ex +++ b/lib/types/att_subnet_info.ex @@ -44,13 +44,13 @@ defmodule Types.AttSubnetInfo do @doc """ Adds a new Attestation to the SubnetInfo if the attestation's data matches the base one. - Assumes that the SubnetInfo already exists. """ @spec add_attestation!(non_neg_integer(), Types.Attestation.t()) :: :ok - def add_attestation!(subnet_id, attestation) do - subnet_info = fetch_subnet_info!(subnet_id) - - if subnet_info.data == attestation.data do + def add_attestation!(subnet_id, %{data: att_data} = attestation) do + # TODO: (#1302) On delayed scenarios (past second third of the slot) we could discard useful + # messages and end up with empty aggregations due to the subnet not being created yet. + with {:ok, subnet_info} <- fetch_subnet_info(subnet_id), + ^att_data <- subnet_info.data do new_subnet_info = %__MODULE__{ subnet_info | attestations: [attestation | subnet_info.attestations] @@ -91,12 +91,6 @@ defmodule Types.AttSubnetInfo do end end - @spec fetch_subnet_info!(non_neg_integer()) :: t() - defp fetch_subnet_info!(subnet_id) do - {:ok, subnet_info} = fetch_subnet_info(subnet_id) - subnet_info - end - @spec delete_subnet(non_neg_integer()) :: :ok defp delete_subnet(subnet_id), do: Db.delete(@subnet_prefix <> Integer.to_string(subnet_id)) diff --git a/lib/types/sync_subnet_info.ex b/lib/types/sync_subnet_info.ex index 7bf3c9102..43476d580 100644 --- a/lib/types/sync_subnet_info.ex +++ b/lib/types/sync_subnet_info.ex @@ -50,16 +50,15 @@ defmodule Types.SyncSubnetInfo do @doc """ Adds a new SyncCommitteeMessage to the SubnetInfo if the message's data matches the base one. - Assumes that the SubnetInfo already exists. """ @spec add_message!(non_neg_integer(), Types.SyncCommitteeMessage.t()) :: :ok - def add_message!( - subnet_id, - %Types.SyncCommitteeMessage{slot: slot, beacon_block_root: root} = message - ) do - subnet_info = fetch_subnet_info!(subnet_id) + def add_message!(subnet_id, %Types.SyncCommitteeMessage{} = message) do + %{slot: slot, beacon_block_root: root} = message - if subnet_info.data == {slot, root} do + # TODO: (#1302) On delayed scenarios (past second third of the slot) we could discard useful + # messages and end up with empty aggregations due to the subnet not being created yet. + with {:ok, subnet_info} <- fetch_subnet_info(subnet_id), + {^slot, ^root} <- subnet_info.data do new_subnet_info = %__MODULE__{ subnet_info | messages: [message | subnet_info.messages] @@ -100,12 +99,6 @@ defmodule Types.SyncSubnetInfo do end end - @spec fetch_subnet_info!(non_neg_integer()) :: t() - defp fetch_subnet_info!(subnet_id) do - {:ok, subnet_info} = fetch_subnet_info(subnet_id) - subnet_info - end - @spec delete_subnet(non_neg_integer()) :: :ok defp delete_subnet(subnet_id), do: Db.delete(@subnet_prefix <> Integer.to_string(subnet_id))