diff --git a/go.mod b/go.mod index f4c08bc..16afef6 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ipfs/go-log v1.0.5 github.com/klauspost/cpuid/v2 v2.0.12 // indirect github.com/koinos/koinos-log-golang v0.0.0-20210621202301-3310a8e5866b - github.com/koinos/koinos-mq-golang v0.0.0-20220907224029-7fc455cb703a + github.com/koinos/koinos-mq-golang v0.0.0-20220923190404-3c5aa9b8945a github.com/koinos/koinos-proto-golang v0.4.1-0.20220906183809-4e07dbd482f6 github.com/koinos/koinos-util-golang v0.0.0-20220831225923-5ba6e0d4e7b9 github.com/libp2p/go-libp2p v0.20.3 @@ -31,6 +31,7 @@ require ( github.com/onsi/ginkgo v1.16.5 // indirect github.com/prometheus/common v0.33.0 // indirect github.com/spf13/pflag v1.0.5 + github.com/streadway/amqp v1.0.0 // indirect github.com/stretchr/testify v1.7.0 go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect diff --git a/go.sum b/go.sum index d290514..88a85f6 100644 --- a/go.sum +++ b/go.sum @@ -579,6 +579,8 @@ github.com/koinos/koinos-log-golang v0.0.0-20210621202301-3310a8e5866b h1:pZ9L1E github.com/koinos/koinos-log-golang v0.0.0-20210621202301-3310a8e5866b/go.mod h1:/dzAVdA+woySENUYwls8RT+5i87Rm4qoMZ4ctEQI8k0= github.com/koinos/koinos-mq-golang v0.0.0-20220907224029-7fc455cb703a h1:tFMtd4xMDxpJVUgeHxA8N/fExZWD1wl8qVTRaVvSh78= github.com/koinos/koinos-mq-golang v0.0.0-20220907224029-7fc455cb703a/go.mod h1:EFk+fuUL5ezbaWsGJ2U0KlzbgkflwRv96az8dEVe16I= +github.com/koinos/koinos-mq-golang v0.0.0-20220923190404-3c5aa9b8945a h1:FRf8LSzLCVwdwFC/csYVpk+fo9SbIb5icZ72Or6caBA= +github.com/koinos/koinos-mq-golang v0.0.0-20220923190404-3c5aa9b8945a/go.mod h1:N4U4Sja49OQuTKfjql3MEGL+2WtaprX0EbmvU0d3RPk= github.com/koinos/koinos-proto-golang v0.3.1-0.20220708180354-16481ac5469c/go.mod h1:ZonOOdmZcuEbRdOqqdfYRA2I4szYHy5aKzUveMWXBog= github.com/koinos/koinos-proto-golang v0.4.1-0.20220906183809-4e07dbd482f6 h1:CM/2njHtG58C6xw8Yto49D/8/+MoZ+L7fFk9cX63Pbg= github.com/koinos/koinos-proto-golang v0.4.1-0.20220906183809-4e07dbd482f6/go.mod h1:ZonOOdmZcuEbRdOqqdfYRA2I4szYHy5aKzUveMWXBog= @@ -1173,6 +1175,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= +github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y= github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0= github.com/raulk/go-watchdog v1.2.0 h1:konN75pw2BMmZ+AfuAm5rtFsWcJpKF3m02rKituuXNo= diff --git a/internal/node/node_test.go b/internal/node/node_test.go index 21e648f..0d69506 100644 --- a/internal/node/node_test.go +++ b/internal/node/node_test.go @@ -72,7 +72,7 @@ func (k *TestRPC) GetBlocksByID(ctx context.Context, blockIDs []multihash.Multih return nil, nil } -func (k *TestRPC) BroadcastGossipStatus(enabled bool) error { +func (k *TestRPC) BroadcastGossipStatus(ctx context.Context, enabled bool) error { return nil } diff --git a/internal/p2p/applicator_test.go b/internal/p2p/applicator_test.go index 31388cd..2021278 100644 --- a/internal/p2p/applicator_test.go +++ b/internal/p2p/applicator_test.go @@ -63,7 +63,7 @@ func (b *applicatorTestRPC) GetBlocksByID(ctx context.Context, blockIDs []multih return &block_store.GetBlocksByIdResponse{}, nil } -func (b *applicatorTestRPC) BroadcastGossipStatus(enabled bool) error { +func (b *applicatorTestRPC) BroadcastGossipStatus(ctx context.Context, enabled bool) error { return nil } diff --git a/internal/p2p/gossip_toggle.go b/internal/p2p/gossip_toggle.go index 77eefb8..c417049 100644 --- a/internal/p2p/gossip_toggle.go +++ b/internal/p2p/gossip_toggle.go @@ -51,13 +51,13 @@ func (g *GossipToggle) checkThresholds(ctx context.Context) { g.enabled = true g.gossipEnabler.EnableGossip(ctx, true) if g.rpc != nil { - _ = g.rpc.BroadcastGossipStatus(true) + _ = g.rpc.BroadcastGossipStatus(ctx, true) } } else if g.opts.DisableThreshold-threshold >= -epsilon && g.enabled { g.enabled = false g.gossipEnabler.EnableGossip(ctx, false) if g.rpc != nil { - _ = g.rpc.BroadcastGossipStatus(false) + _ = g.rpc.BroadcastGossipStatus(ctx, false) } } } diff --git a/internal/p2p_test.go b/internal/p2p_test.go index 3d815e4..5f8b9bf 100644 --- a/internal/p2p_test.go +++ b/internal/p2p_test.go @@ -123,7 +123,7 @@ func (k *TestRPC) ApplyTransaction(ctx context.Context, block *protocol.Transact return &chain.SubmitTransactionResponse{}, nil } -func (k *TestRPC) BroadcastGossipStatus(enabled bool) error { +func (k *TestRPC) BroadcastGossipStatus(ctx context.Context, enabled bool) error { return nil } diff --git a/internal/rpc/koinos_rpc.go b/internal/rpc/koinos_rpc.go index 271866d..2b0e187 100644 --- a/internal/rpc/koinos_rpc.go +++ b/internal/rpc/koinos_rpc.go @@ -367,14 +367,14 @@ func (k *KoinosRPC) GetForkHeads(ctx context.Context) (*chainrpc.GetForkHeadsRes } // BroadcastGossipStatus broadcasts the gossip status to the -func (k *KoinosRPC) BroadcastGossipStatus(enabled bool) error { +func (k *KoinosRPC) BroadcastGossipStatus(ctx context.Context, enabled bool) error { status := &broadcast.GossipStatus{Enabled: enabled} data, err := canonical.Marshal(status) if err != nil { return fmt.Errorf("%w BroadcastGossipStatus, %s", p2perrors.ErrSerialization, err) } - return k.mq.Broadcast("application/octet-stream", "koinos.gossip.status", data) + return k.mq.Broadcast(ctx, "application/octet-stream", "koinos.gossip.status", data) } // IsConnectedToBlockStore returns if the AMQP connection can currently communicate diff --git a/internal/rpc/local_rpc.go b/internal/rpc/local_rpc.go index fce3991..b6d198d 100644 --- a/internal/rpc/local_rpc.go +++ b/internal/rpc/local_rpc.go @@ -18,7 +18,7 @@ type LocalRPC interface { GetChainID(ctx context.Context) (*chain.GetChainIdResponse, error) GetForkHeads(ctx context.Context) (*chain.GetForkHeadsResponse, error) GetBlocksByID(ctx context.Context, blockIDs []multihash.Multihash) (*block_store.GetBlocksByIdResponse, error) - BroadcastGossipStatus(enabled bool) error + BroadcastGossipStatus(ctx context.Context, enabled bool) error IsConnectedToBlockStore(ctx context.Context) (bool, error) IsConnectedToChain(ctx context.Context) (bool, error)