From 55d0b6da199d0d5496cd0a4f40f4cc2a3a533fb4 Mon Sep 17 00:00:00 2001 From: jayy04 <103467857+jayy04@users.noreply.github.com> Date: Wed, 6 Mar 2024 10:48:42 -0500 Subject: [PATCH] [CT-652] add command line flag for full node streaming (#1145) --- protocol/app/app.go | 10 ++++++---- protocol/app/flags/flags.go | 32 ++++++++++++++++++++++++++++++++ protocol/app/flags/flags_test.go | 29 +++++++++++++++++++++++++++-- 3 files changed, 65 insertions(+), 6 deletions(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index 53d18c9b37..b737375fdb 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -685,7 +685,7 @@ func New( indexerFlags.SendOffchainData, ) - app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, appOpts, logger) + app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, logger) timeProvider := &timelib.TimeProviderImpl{} @@ -1756,9 +1756,11 @@ func getIndexerFromOptions( // options. This function will default to returning a no-op instance. func getGrpcStreamingManagerFromOptions( appFlags flags.Flags, - appOpts servertypes.AppOptions, logger log.Logger, ) (manager streamingtypes.GrpcStreamingManager) { - // TODO(CT-625): add command line flags for full node streaming. - return streaming.NewGrpcStreamingManager() + if appFlags.GrpcStreamingEnabled { + logger.Info("GRPC streaming is enabled") + return streaming.NewGrpcStreamingManager() + } + return streaming.NewNoopGrpcStreamingManager() } diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index 6f67611a22..18fa78a19b 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -19,6 +19,9 @@ type Flags struct { // Existing flags GrpcAddress string GrpcEnable bool + + // Grpc Streaming + GrpcStreamingEnabled bool } // List of CLI flags. @@ -31,6 +34,9 @@ const ( // Cosmos flags below. These config values can be set as flags or in config.toml. GrpcAddress = "grpc.address" GrpcEnable = "grpc.enable" + + // Grpc Streaming + GrpcStreamingEnabled = "grpc-streaming-enabled" ) // Default values. @@ -39,6 +45,8 @@ const ( DefaultDdTraceAgentPort = 8126 DefaultNonValidatingFullNode = false DefaultDdErrorTrackingFormat = false + + DefaultGrpcStreamingEnabled = false ) // AddFlagsToCmd adds flags to app initialization. @@ -67,6 +75,11 @@ func AddFlagsToCmd(cmd *cobra.Command) { DefaultDdErrorTrackingFormat, "Enable formatting of log error tags to datadog error tracking format", ) + cmd.Flags().Bool( + GrpcStreamingEnabled, + DefaultGrpcStreamingEnabled, + "Whether to enable grpc streaming for full nodes", + ) } // Validate checks that the flags are valid. @@ -75,6 +88,17 @@ func (f *Flags) Validate() error { if !f.NonValidatingFullNode && !f.GrpcEnable { return fmt.Errorf("grpc.enable must be set to true - validating requires gRPC server") } + + // Grpc streaming + if f.GrpcStreamingEnabled { + if !f.GrpcEnable { + return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server") + } + + if !f.NonValidatingFullNode { + return fmt.Errorf("grpc-streaming-enabled can only be set to true for non-validating full nodes") + } + } return nil } @@ -93,6 +117,8 @@ func GetFlagValuesFromOptions( // These are the default values from the Cosmos flags. GrpcAddress: config.DefaultGRPCAddress, GrpcEnable: true, + + GrpcStreamingEnabled: DefaultGrpcStreamingEnabled, } // Populate the flags if they exist. @@ -132,5 +158,11 @@ func GetFlagValuesFromOptions( } } + if option := appOpts.Get(GrpcStreamingEnabled); option != nil { + if v, err := cast.ToBoolE(option); err == nil { + result.GrpcStreamingEnabled = v + } + } + return result } diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index aaba0016b3..c170ec864f 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -2,9 +2,10 @@ package flags_test import ( "fmt" - "github.com/cosmos/cosmos-sdk/server/config" "testing" + "github.com/cosmos/cosmos-sdk/server/config" + "github.com/dydxprotocol/v4-chain/protocol/app/flags" "github.com/dydxprotocol/v4-chain/protocol/mocks" "github.com/spf13/cobra" @@ -27,7 +28,11 @@ func TestAddFlagsToCommand(t *testing.T) { }, fmt.Sprintf("Has %s flag", flags.DdTraceAgentPort): { flagName: flags.DdTraceAgentPort, - }} + }, + fmt.Sprintf("Has %s flag", flags.GrpcStreamingEnabled): { + flagName: flags.GrpcStreamingEnabled, + }, + } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -62,6 +67,22 @@ func TestValidate(t *testing.T) { }, expectedErr: fmt.Errorf("grpc.enable must be set to true - validating requires gRPC server"), }, + "failure - gRPC streaming enabled for validating nodes": { + flags: flags.Flags{ + NonValidatingFullNode: false, + GrpcEnable: true, + GrpcStreamingEnabled: true, + }, + expectedErr: fmt.Errorf("grpc-streaming-enabled can only be set to true for non-validating full nodes"), + }, + "failure - gRPC streaming enabled with gRPC disabled": { + flags: flags.Flags{ + NonValidatingFullNode: true, + GrpcEnable: false, + GrpcStreamingEnabled: true, + }, + expectedErr: fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server"), + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -86,6 +107,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedDdTraceAgentPort uint16 expectedGrpcAddress string expectedGrpcEnable bool + expectedGrpcStreamingEnable bool }{ "Sets to default if unset": { expectedNonValidatingFullNodeFlag: false, @@ -93,6 +115,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedDdTraceAgentPort: 8126, expectedGrpcAddress: "localhost:9090", expectedGrpcEnable: true, + expectedGrpcStreamingEnable: false, }, "Sets values from options": { optsMap: map[string]any{ @@ -101,12 +124,14 @@ func TestGetFlagValuesFromOptions(t *testing.T) { flags.DdTraceAgentPort: uint16(777), flags.GrpcEnable: false, flags.GrpcAddress: "localhost:9091", + flags.GrpcStreamingEnabled: "true", }, expectedNonValidatingFullNodeFlag: true, expectedDdAgentHost: "agentHostTest", expectedDdTraceAgentPort: 777, expectedGrpcEnable: false, expectedGrpcAddress: "localhost:9091", + expectedGrpcStreamingEnable: true, }, }