diff --git a/.gitignore b/.gitignore index 0fa41cb8..736eaa6f 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,7 @@ target # IDE .idea +.vscode # just for example id_rsa diff --git a/config-files/operator-docker-compose.anvil.yaml b/config-files/operator-docker-compose.anvil.yaml index 2cb64d7c..b87005da 100644 --- a/config-files/operator-docker-compose.anvil.yaml +++ b/config-files/operator-docker-compose.anvil.yaml @@ -47,3 +47,7 @@ register_operator_on_startup: true # address of token to deposit tokens into when registering on startup # addresses.erc20MockStrategy in tests/anvil/sffl_avs_deployment_output.json token_strategy_addr: 0xa85233C63b9Ee964Add6F2cffe00Fd84eb32338f + +near_da_indexer_ip_port_address: localhost:5672 +near_da_indexer_consumer_tag: operator +near_da_indexer_rollup_ids: [0] diff --git a/config-files/operator.anvil.yaml b/config-files/operator.anvil.yaml index 656bc603..cb4de26f 100644 --- a/config-files/operator.anvil.yaml +++ b/config-files/operator.anvil.yaml @@ -41,3 +41,7 @@ register_operator_on_startup: true # address of token to deposit tokens into when registering on startup # addresses.erc20MockStrategy in tests/anvil/sffl_avs_deployment_output.json token_strategy_addr: 0xa85233C63b9Ee964Add6F2cffe00Fd84eb32338f + +near_da_indexer_ip_port_address: localhost:5672 +near_da_indexer_consumer_tag: operator +near_da_indexer_rollup_ids: [0] diff --git a/go.mod b/go.mod index a4e9f5be..ad505cb0 100644 --- a/go.mod +++ b/go.mod @@ -83,6 +83,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/rabbitmq/amqp091-go v1.9.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index 039528e8..4cdb7529 100644 --- a/go.sum +++ b/go.sum @@ -336,6 +336,8 @@ github.com/prometheus/common v0.46.0 h1:doXzt5ybi1HBKpsZOL0sSkaNHJJqkyfEWZGGqqSc github.com/prometheus/common v0.46.0/go.mod h1:Tp0qkxpb9Jsg54QMe+EAmqXkSV7Evdy1BTn+g2pa/hQ= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= @@ -429,6 +431,8 @@ github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFi github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo= go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -567,6 +571,7 @@ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/indexer/src/rabbit_publisher.rs b/indexer/src/rabbit_publisher.rs index 2b88d707..ec0f884b 100644 --- a/indexer/src/rabbit_publisher.rs +++ b/indexer/src/rabbit_publisher.rs @@ -10,6 +10,7 @@ use crate::errors::{Error, Result}; const DEFAULT_EXCHANGE: &str = ""; const DEFAULT_ROUTING_KEY: &str = "da-mq"; +const PERSISTENT_DELIVERY_MODE: u8 = 2; pub(crate) type Connection = deadpool::managed::Object; @@ -27,7 +28,7 @@ impl Default for PublishOptions { exchange: DEFAULT_EXCHANGE.into(), routing_key: DEFAULT_ROUTING_KEY.into(), basic_publish_options: BasicPublishOptions::default(), - basic_properties: BasicProperties::default(), + basic_properties: BasicProperties::default().with_delivery_mode(PERSISTENT_DELIVERY_MODE) } } } diff --git a/operator/consumer/cmd/main.go b/operator/consumer/cmd/main.go new file mode 100644 index 00000000..c2e318c4 --- /dev/null +++ b/operator/consumer/cmd/main.go @@ -0,0 +1,74 @@ +package main + +import ( + "errors" + "log" + "os" + + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/NethermindEth/near-sffl/operator/consumer" + "github.com/urfave/cli" +) + +func main() { + app := cli.NewApp() + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "rmq-address", + Required: true, + Usage: "Connect to RMQ publisher at `ADDRESS`", + }, + cli.StringFlag{ + Name: "consumer-tag", + Value: "da-consumer", + Usage: "Connect to RMQ publisher using `TAG`", + }, + cli.Int64SliceFlag{ + Name: "rollup-ids", + Required: true, + Usage: "Consume data from rollup `ID`", + }, + } + app.Name = "sffl-indexer-consumer" + app.Usage = "SFFL Indexer Consumer" + app.Description = "Super Fast Finality Layer test service to consume NEAR DA published block data from the indexer" + + app.Action = consumerMain + err := app.Run(os.Args) + if err != nil { + log.Fatalln("Application failed. Message:", err) + } +} + +func consumerMain(ctx *cli.Context) error { + log.Println("Initializing Consumer") + + logLevel := logging.Development + logger, err := logging.NewZapLogger(logLevel) + if err != nil { + panic(err) + } + + rollupIdsArg := ctx.GlobalInt64Slice("rollup-ids") + rollupIds := make([]uint32, len(rollupIdsArg)) + for i, el := range rollupIdsArg { + if el < 0 { + return errors.New("Rollup IDs should not be < 0") + } + + rollupIds[i] = uint32(el) + } + + consumer := consumer.NewConsumer(consumer.ConsumerConfig{ + Addr: ctx.GlobalString("rmq-address"), + ConsumerTag: ctx.GlobalString("consumer-tag"), + RollupIds: rollupIds, + }, logger) + + blockStream := consumer.GetBlockStream() + + for { + block := <-blockStream + logger.Info("Block received", "block", block) + } +} diff --git a/operator/consumer/consumer.go b/operator/consumer/consumer.go new file mode 100644 index 00000000..68d9ba1b --- /dev/null +++ b/operator/consumer/consumer.go @@ -0,0 +1,246 @@ +package consumer + +import ( + "context" + "errors" + "strconv" + "time" + + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/ethereum/go-ethereum/core/types" + rmq "github.com/rabbitmq/amqp091-go" +) + +const ( + RECONNECT_DELAY = 3 * time.Second + RECHANNEL_DELAY = 2 * time.Second +) + +var ( + AlreadyClosedError = errors.New("Consumer connection is already closed") +) + +func getQueueName(rollupId uint32) string { + return "rollup" + strconv.FormatUint(uint64(rollupId), 10) +} + +type ConsumerConfig struct { + Addr string + ConsumerTag string + RollupIds []uint32 +} + +type BlockData struct { + RollupId uint32 + Block types.Block +} + +type Consumerer interface { + Reconnect(addr string, ctx context.Context) + ResetChannel(conn *rmq.Connection, ctx context.Context) bool + Close() error + GetBlockStream() <-chan BlockData +} + +type Consumer struct { + consumerTag string + receivedBlocksC chan BlockData + queuesListener QueuesListener + + rollupIds []uint32 + + isReady bool + contextCancelFunc context.CancelFunc + connection *rmq.Connection + connClosedErrC <-chan *rmq.Error + channel *rmq.Channel + chanClosedErrC <-chan *rmq.Error + + logger logging.Logger +} + +func NewConsumer(config ConsumerConfig, logger logging.Logger) Consumer { + ctx, cancel := context.WithCancel(context.Background()) + + consumer := Consumer{ + consumerTag: config.ConsumerTag, + rollupIds: config.RollupIds, + receivedBlocksC: make(chan BlockData), + contextCancelFunc: cancel, + logger: logger, + } + + go consumer.Reconnect(config.Addr, ctx) + + return consumer +} + +func (consumer *Consumer) Reconnect(addr string, ctx context.Context) { + for { + consumer.logger.Info("Reconnecting...") + + consumer.isReady = false + conn, err := consumer.connect(addr) + if err != nil { + consumer.logger.Warn("Connection setup failed", "err", err) + + select { + case <-ctx.Done(): + consumer.logger.Info("Consumer context canceled") + return + case <-time.After(RECONNECT_DELAY): + } + + continue + } + + if done := consumer.ResetChannel(conn, ctx); done { + return + } + + consumer.logger.Info("Connected") + + select { + case <-ctx.Done(): + consumer.logger.Info("Consumer context canceled") + // deref cancel smth? + break + + case err := <-consumer.connClosedErrC: + if !err.Recover { + consumer.logger.Error("Can't recover connection", "err", err) + break + } + + consumer.logger.Warn("Recovering connection, closed with:", "err", err) + + case err := <-consumer.chanClosedErrC: + if !err.Recover { + consumer.logger.Error("Can't recover connection", "err", err) + break + } + + consumer.logger.Warn("Reconnecting channel, closed with:", "err", err) + } + } +} + +func (consumer *Consumer) connect(addr string) (*rmq.Connection, error) { + conn, err := rmq.Dial(addr) + if err != nil { + return nil, err + } + + consumer.changeConnection(conn) + return conn, nil +} + +func (consumer *Consumer) changeConnection(conn *rmq.Connection) { + consumer.connection = conn + + connClosedErrC := make(chan *rmq.Error) + consumer.connClosedErrC = conn.NotifyClose(connClosedErrC) +} + +func (consumer *Consumer) ResetChannel(conn *rmq.Connection, ctx context.Context) bool { + for { + consumer.isReady = false + + err := consumer.setupChannel(conn, ctx) + if err != nil { + consumer.logger.Warn("Channel setup failed", "err", err) + + select { + case <-ctx.Done(): + consumer.logger.Info("Consumer context canceled") + return true + + case rmqError := <-consumer.connClosedErrC: + if rmqError.Recover { + consumer.logger.Error("Can't recover connection", "err", err) + return true + } + + consumer.logger.Warn("Recovering connection, closed with:", "err", err) + return false + case <-time.After(RECHANNEL_DELAY): + } + + continue + } + + return false + } +} + +func (consumer *Consumer) setupChannel(conn *rmq.Connection, ctx context.Context) error { + channel, err := conn.Channel() + if err != nil { + return err + } + + listener := NewQueuesListener(consumer.receivedBlocksC, consumer.logger) + for _, rollupId := range consumer.rollupIds { + queue, err := channel.QueueDeclare(getQueueName(rollupId), true, false, false, false, nil) + if err != nil { + return err + } + + rollupDataC, err := channel.Consume( + queue.Name, + consumer.consumerTag, + false, + false, + false, + false, + nil, + ) + + if err != nil { + return err + } + + err = listener.Add(rollupId, rollupDataC, ctx) + if err != nil { + return err + } + } + + consumer.queuesListener = listener + consumer.changeChannel(channel) + consumer.isReady = true + return nil +} + +func (consumer *Consumer) changeChannel(channel *rmq.Channel) { + consumer.channel = channel + + chanClosedErrC := make(chan *rmq.Error) + consumer.chanClosedErrC = channel.NotifyClose(chanClosedErrC) +} + +func (consumer *Consumer) Close() error { + if !consumer.isReady { + return AlreadyClosedError + } + + // shut down goroutines + consumer.contextCancelFunc() + + err := consumer.channel.Close() + if err != nil { + return err + } + + err = consumer.connection.Close() + if err != nil { + return err + } + + consumer.isReady = false + return nil +} + +func (consumer Consumer) GetBlockStream() <-chan BlockData { + return consumer.receivedBlocksC +} diff --git a/operator/consumer/queues_listener.go b/operator/consumer/queues_listener.go new file mode 100644 index 00000000..2ffef573 --- /dev/null +++ b/operator/consumer/queues_listener.go @@ -0,0 +1,71 @@ +package consumer + +import ( + "context" + "errors" + + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + rmq "github.com/rabbitmq/amqp091-go" +) + +var ( + QueueExistsError = errors.New("Queue already exists") +) + +type QueuesListener struct { + receivedBlocksC chan<- BlockData + queueDeliveryCs map[uint32]<-chan rmq.Delivery + + logger logging.Logger +} + +func NewQueuesListener(receivedBlocksC chan<- BlockData, logger logging.Logger) QueuesListener { + listener := QueuesListener{ + receivedBlocksC: receivedBlocksC, + queueDeliveryCs: make(map[uint32]<-chan rmq.Delivery), + logger: logger, + } + + return listener +} + +func (listener *QueuesListener) Add(rollupId uint32, rollupDataC <-chan rmq.Delivery, ctx context.Context) error { + if _, exists := listener.queueDeliveryCs[rollupId]; exists { + return QueueExistsError + } + + listener.queueDeliveryCs[rollupId] = rollupDataC + go listener.listen(rollupId, rollupDataC, ctx) + + return nil +} + +func (listener *QueuesListener) listen(rollupId uint32, rollupDataC <-chan rmq.Delivery, ctx context.Context) { + for { + select { + case d, ok := <-rollupDataC: + if !ok { + listener.logger.Info("Deliveries channel close", "rollupId", rollupId) + break + } + + listener.logger.Info("New delivery", "rollupId", rollupId) + + var block types.Block + if err := rlp.DecodeBytes(d.Body, &block); err != nil { + listener.logger.Warn("Invalid block", "rollupId", rollupId, "err", err) + continue + } + + listener.receivedBlocksC <- BlockData{RollupId: rollupId, Block: block} + d.Ack(false) + + case <-ctx.Done(): + listener.logger.Info("Consumer context canceled") + // TODO: some closing and canceling here + break + } + } +} diff --git a/operator/operator.go b/operator/operator.go index d70f9922..b503a7df 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -14,6 +14,7 @@ import ( "github.com/NethermindEth/near-sffl/core" "github.com/NethermindEth/near-sffl/core/chainio" "github.com/NethermindEth/near-sffl/metrics" + "github.com/NethermindEth/near-sffl/operator/consumer" "github.com/NethermindEth/near-sffl/types" "github.com/Layr-Labs/eigensdk-go/chainio/clients" @@ -63,6 +64,8 @@ type Operator struct { aggregatorRpcClient AggregatorRpcClienter // needed when opting in to avs (allow this service manager contract to slash operator) sfflServiceManagerAddr common.Address + // NEAR DA indexer consumer + consumer consumer.Consumerer } // TODO(samlaf): config is a mess right now, since the chainio client constructors @@ -197,6 +200,12 @@ func NewOperatorFromConfig(c types.NodeConfig) (*Operator, error) { return nil, err } + consumer := consumer.NewConsumer(consumer.ConsumerConfig{ + Addr: c.NearDaIndexerRmqIpPortAddress, + ConsumerTag: c.NearDaIndexerConsumerTag, + RollupIds: c.NearDaIndexerRollupIds, + }, logger) + operator := &Operator{ config: c, logger: logger, @@ -216,7 +225,7 @@ func NewOperatorFromConfig(c types.NodeConfig) (*Operator, error) { checkpointTaskCreatedChan: make(chan *taskmanager.ContractSFFLTaskManagerCheckpointTaskCreated), sfflServiceManagerAddr: common.HexToAddress(c.AVSRegistryCoordinatorAddress), operatorId: [32]byte{0}, // this is set below - + consumer: &consumer, } if c.RegisterOperatorOnStartup { @@ -274,6 +283,9 @@ func (o *Operator) Start(ctx context.Context) error { // TODO(samlaf): wrap this call with increase in avs-node-spec metric sub := o.avsSubscriber.SubscribeToNewTasks(o.checkpointTaskCreatedChan) + + blockReceivedChan := o.consumer.GetBlockStream() + for { select { case <-ctx.Done(): @@ -296,6 +308,9 @@ func (o *Operator) Start(ctx context.Context) error { continue } go o.aggregatorRpcClient.SendSignedCheckpointTaskResponseToAggregator(signedCheckpointTaskResponse) + case blockData := <-blockReceivedChan: + o.logger.Info("Block received on operator", "rollupId", blockData.RollupId, "block", blockData.Block) + continue } } } diff --git a/operator/operator_test.go b/operator/operator_test.go index 0cd6ee60..64484586 100644 --- a/operator/operator_test.go +++ b/operator/operator_test.go @@ -22,7 +22,7 @@ import ( ) func TestOperator(t *testing.T) { - operator, err := createMockOperator() + operator, _, err := createMockOperator() assert.Nil(t, err) const taskIndex = 1 diff --git a/operator/registration_test.go b/operator/registration_test.go index 1e60105d..88248018 100644 --- a/operator/registration_test.go +++ b/operator/registration_test.go @@ -1,9 +1,11 @@ package operator import ( + "context" "testing" "github.com/prometheus/client_golang/prometheus" + rmq "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/assert" "github.com/Layr-Labs/eigensdk-go/crypto/bls" @@ -11,6 +13,7 @@ import ( taskmanager "github.com/NethermindEth/near-sffl/contracts/bindings/SFFLTaskManager" "github.com/NethermindEth/near-sffl/metrics" + "github.com/NethermindEth/near-sffl/operator/consumer" "github.com/NethermindEth/near-sffl/tests" ) @@ -25,23 +28,48 @@ var MOCK_OPERATOR_ID = [32]byte{207, 73, 226, 221, 104, 100, 123, 41, 192, 3, 9, func IntegrationTestOperatorRegistration(t *testing.T) { anvilCmd := tests.StartAnvilChainAndDeployContracts() defer anvilCmd.Process.Kill() - operator, err := createMockOperator() + operator, _, err := createMockOperator() assert.Nil(t, err) err = operator.RegisterOperatorWithEigenlayer() assert.Nil(t, err) } -func createMockOperator() (*Operator, error) { +type MockConsumer struct { + blockReceivedC chan consumer.BlockData +} + +func createMockConsumer() *MockConsumer { + return &MockConsumer{ + blockReceivedC: make(chan consumer.BlockData), + } +} +func (c *MockConsumer) Reconnect(addr string, ctx context.Context) {} +func (c *MockConsumer) ResetChannel(conn *rmq.Connection, ctx context.Context) bool { + return true +} +func (c *MockConsumer) Close() error { + return nil +} +func (c *MockConsumer) GetBlockStream() <-chan consumer.BlockData { + return c.blockReceivedC +} +func (c *MockConsumer) MockReceiveBlockData(data consumer.BlockData) { + c.blockReceivedC <- data +} + +func createMockOperator() (*Operator, *MockConsumer, error) { logger := sdklogging.NewNoopLogger() reg := prometheus.NewRegistry() noopMetrics := metrics.NewNoopMetrics() blsPrivateKey, err := bls.NewPrivateKey(MOCK_OPERATOR_BLS_PRIVATE_KEY) if err != nil { - return nil, err + return nil, nil, err } operatorKeypair := bls.NewKeyPair(blsPrivateKey) + mockConsumer := createMockConsumer() + operator := &Operator{ logger: logger, blsKeypair: operatorKeypair, @@ -49,6 +77,8 @@ func createMockOperator() (*Operator, error) { metrics: noopMetrics, checkpointTaskCreatedChan: make(chan *taskmanager.ContractSFFLTaskManagerCheckpointTaskCreated), operatorId: MOCK_OPERATOR_ID, + consumer: mockConsumer, } - return operator, nil + + return operator, mockConsumer, nil } diff --git a/types/avs_config.go b/types/avs_config.go index 5157698a..eabe57b5 100644 --- a/types/avs_config.go +++ b/types/avs_config.go @@ -2,19 +2,22 @@ package types type NodeConfig struct { // used to set the logger level (true = info, false = debug) - Production bool `yaml:"production"` - OperatorAddress string `yaml:"operator_address"` - OperatorStateRetrieverAddress string `yaml:"operator_state_retriever_address"` - AVSRegistryCoordinatorAddress string `yaml:"avs_registry_coordinator_address"` - TokenStrategyAddr string `yaml:"token_strategy_addr"` - EthRpcUrl string `yaml:"eth_rpc_url"` - EthWsUrl string `yaml:"eth_ws_url"` - BlsPrivateKeyStorePath string `yaml:"bls_private_key_store_path"` - EcdsaPrivateKeyStorePath string `yaml:"ecdsa_private_key_store_path"` - AggregatorServerIpPortAddress string `yaml:"aggregator_server_ip_port_address"` - RegisterOperatorOnStartup bool `yaml:"register_operator_on_startup"` - EigenMetricsIpPortAddress string `yaml:"eigen_metrics_ip_port_address"` - EnableMetrics bool `yaml:"enable_metrics"` - NodeApiIpPortAddress string `yaml:"node_api_ip_port_address"` - EnableNodeApi bool `yaml:"enable_node_api"` + Production bool `yaml:"production"` + OperatorAddress string `yaml:"operator_address"` + OperatorStateRetrieverAddress string `yaml:"operator_state_retriever_address"` + AVSRegistryCoordinatorAddress string `yaml:"avs_registry_coordinator_address"` + TokenStrategyAddr string `yaml:"token_strategy_addr"` + EthRpcUrl string `yaml:"eth_rpc_url"` + EthWsUrl string `yaml:"eth_ws_url"` + BlsPrivateKeyStorePath string `yaml:"bls_private_key_store_path"` + EcdsaPrivateKeyStorePath string `yaml:"ecdsa_private_key_store_path"` + AggregatorServerIpPortAddress string `yaml:"aggregator_server_ip_port_address"` + RegisterOperatorOnStartup bool `yaml:"register_operator_on_startup"` + EigenMetricsIpPortAddress string `yaml:"eigen_metrics_ip_port_address"` + EnableMetrics bool `yaml:"enable_metrics"` + NodeApiIpPortAddress string `yaml:"node_api_ip_port_address"` + EnableNodeApi bool `yaml:"enable_node_api"` + NearDaIndexerRmqIpPortAddress string `yaml:"near_da_indexer_ip_port_address"` + NearDaIndexerConsumerTag string `yaml:"near_da_indexer_consumer_tag"` + NearDaIndexerRollupIds []uint32 `yaml:"near_da_indexer_rollup_ids"` }