diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 296df1fd0..f95a022e8 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -105,7 +105,7 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo return nil, err } - chainId, err := ethHttpClient.ChainID(context.Background()) + chainId, err := ethHttpClient.ChainID(ctx) if err != nil { logger.Error("Cannot get chainId", "err", err) return nil, err diff --git a/aggregator/cmd/main.go b/aggregator/cmd/main.go index 0896f0958..c6a3660f1 100644 --- a/aggregator/cmd/main.go +++ b/aggregator/cmd/main.go @@ -39,13 +39,13 @@ func main() { func aggregatorMain(ctx *cli.Context) error { log.Println("Initializing Aggregator") - rawConfig := config.NewRawConfig(ctx) - logger, err := sdklogging.NewZapLogger(rawConfig.Environment) + configRaw := config.NewConfigRaw(ctx) + logger, err := sdklogging.NewZapLogger(configRaw.Environment) if err != nil { return err } - config, err := config.NewConfig(rawConfig, ctx) + config, err := config.NewConfig(configRaw, ctx) if err != nil { logger.Fatal("Error creating config", "err", err) return err diff --git a/core/config/config.go b/core/config/config.go index 28f0c3ddb..7b49981a5 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -55,14 +55,14 @@ type SFFLContractsRaw struct { OperatorStateRetrieverAddr string `json:"operatorStateRetriever"` } -func NewRawConfig(ctx *cli.Context) ConfigRaw { - var rawConfig ConfigRaw +func NewConfigRaw(ctx *cli.Context) ConfigRaw { + var configRaw ConfigRaw configFilePath := ctx.GlobalString(ConfigFileFlag.Name) if configFilePath != "" { - sdkutils.ReadYamlConfig(configFilePath, &rawConfig) + sdkutils.ReadYamlConfig(configFilePath, &configRaw) } - return rawConfig + return configRaw } // NewConfig parses config file to read from from flags or environment variables diff --git a/operator/attestor/attestor.go b/operator/attestor/attestor.go index b2a1a05d4..88f303e14 100644 --- a/operator/attestor/attestor.go +++ b/operator/attestor/attestor.go @@ -68,7 +68,7 @@ type Attestor struct { rollupIdsToUrls map[uint32]string clients map[uint32]eth.EthClient notifier Notifier - consumer consumer.Consumer + consumer *consumer.Consumer registry *prometheus.Registry config *types.NodeConfig diff --git a/operator/consumer/consumer.go b/operator/consumer/consumer.go index 0ebc6ee63..3dc9e4200 100644 --- a/operator/consumer/consumer.go +++ b/operator/consumer/consumer.go @@ -62,7 +62,7 @@ type Consumer struct { logger logging.Logger } -func NewConsumer(config ConsumerConfig, logger logging.Logger) Consumer { +func NewConsumer(config ConsumerConfig, logger logging.Logger) *Consumer { ctx, cancel := context.WithCancel(context.Background()) consumer := Consumer{ @@ -74,7 +74,7 @@ func NewConsumer(config ConsumerConfig, logger logging.Logger) Consumer { go consumer.Reconnect(config.Addr, ctx) - return consumer + return &consumer } func (consumer *Consumer) Reconnect(addr string, ctx context.Context) { @@ -106,7 +106,7 @@ func (consumer *Consumer) Reconnect(addr string, ctx context.Context) { case <-ctx.Done(): consumer.logger.Info("Consumer context canceled") // deref cancel smth? - break + return case err := <-consumer.connClosedErrC: if !err.Recover { diff --git a/operator/consumer/queues_listener.go b/operator/consumer/queues_listener.go index 2ffef5735..8eb3a9ea6 100644 --- a/operator/consumer/queues_listener.go +++ b/operator/consumer/queues_listener.go @@ -65,7 +65,7 @@ func (listener *QueuesListener) listen(rollupId uint32, rollupDataC <-chan rmq.D case <-ctx.Done(): listener.logger.Info("Consumer context canceled") // TODO: some closing and canceling here - break + return } } } diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index 14e838d3e..23d1f2d28 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -46,11 +46,12 @@ const TEST_DATA_DIR = "../../test_data" func TestIntegration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + setup := setupTestEnv(t, ctx) t.Cleanup(func() { cancel() - }) - setup := setupTestEnv(t, ctx) + setup.cleanup() + }) time.Sleep(10 * time.Second) @@ -126,6 +127,7 @@ type testEnv struct { avsReader *chainio.AvsReader registryRollups []*registryrollup.ContractSFFLRegistryRollup registryRollupAuths []*bind.TransactOpts + cleanup func() } func setupTestEnv(t *testing.T, ctx context.Context) *testEnv { @@ -181,7 +183,7 @@ func setupTestEnv(t *testing.T, ctx context.Context) *testEnv { registryRollups, registryRollupAuths := deployRegistryRollups(t, ctx, avsReader, rollupAnvils) - t.Cleanup(func() { + cleanup := func() { if err := os.RemoveAll(TEST_DATA_DIR); err != nil { t.Fatalf("Error cleaning test data dir: %s", err.Error()) } @@ -207,7 +209,7 @@ func setupTestEnv(t *testing.T, ctx context.Context) *testEnv { } cancelContainersCtx() - }) + } return &testEnv{ mainnetAnvil: mainnetAnvil, @@ -220,6 +222,7 @@ func setupTestEnv(t *testing.T, ctx context.Context) *testEnv { avsReader: avsReader, registryRollups: registryRollups, registryRollupAuths: registryRollupAuths, + cleanup: cleanup, } }