diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go deleted file mode 100644 index 90c39509b..000000000 --- a/cmd/cli/cli.go +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cli - -import ( - "fmt" - "os" - - "github.com/conduitio/conduit/pkg/conduit" - "github.com/spf13/cobra" - "github.com/spf13/pflag" -) - -var ( - initArgs InitArgs - pipelinesInitArgs PipelinesInitArgs -) - -type Instance struct { - rootCmd *cobra.Command -} - -// New creates a new CLI Instance. -func New() *Instance { - return &Instance{ - rootCmd: buildRootCmd(), - } -} - -func (i *Instance) Run() { - if err := i.rootCmd.Execute(); err != nil { - _, _ = fmt.Fprintf(os.Stderr, "%v\n", err) - os.Exit(1) - } -} - -func buildRootCmd() *cobra.Command { - cfg := conduit.DefaultConfig() - - cmd := &cobra.Command{ - Use: "conduit", - Short: "Conduit CLI", - Long: "Conduit CLI is a command-line that helps you interact with and manage Conduit.", - Version: conduit.Version(true), - Run: func(cmd *cobra.Command, args []string) { - e := &conduit.Entrypoint{} - e.Serve(cfg) - }, - } - cmd.CompletionOptions.DisableDefaultCmd = true - conduit.Flags(&cfg).VisitAll(cmd.Flags().AddGoFlag) - - // init - cmd.AddCommand(buildInitCmd()) - - // pipelines - cmd.AddGroup(&cobra.Group{ - ID: "pipelines", - Title: "Pipelines", - }) - cmd.AddCommand(buildPipelinesCmd()) - - // mark hidden flags - cmd.Flags().VisitAll(func(f *pflag.Flag) { - if conduit.HiddenFlags[f.Name] { - err := cmd.Flags().MarkHidden(f.Name) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to mark flag %q as hidden: %v", f.Name, err) - } - } - }) - - return cmd -} - -func buildInitCmd() *cobra.Command { - initCmd := &cobra.Command{ - Use: "init", - Short: "Initialize Conduit with a configuration file and directories.", - Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, args []string) error { - return NewConduitInit(initArgs).Run() - }, - } - initCmd.Flags().StringVar( - &initArgs.Path, - "config.path", - "", - "path where Conduit will be initialized", - ) - - return initCmd -} - -func buildPipelinesCmd() *cobra.Command { - pipelinesCmd := &cobra.Command{ - Use: "pipelines", - Short: "Initialize and manage pipelines", - Args: cobra.NoArgs, - GroupID: "pipelines", - } - - pipelinesCmd.AddCommand(buildPipelinesInitCmd()) - - return pipelinesCmd -} - -func buildPipelinesInitCmd() *cobra.Command { - pipelinesInitCmd := &cobra.Command{ - Use: "init [pipeline-name]", - Short: "Initialize an example pipeline.", - Long: `Initialize a pipeline configuration file, with all of parameters for source and destination connectors -initialized and described. The source and destination connector can be chosen via flags. If no connectors are chosen, then -a simple and runnable generator-to-log pipeline is configured.`, - Args: cobra.MaximumNArgs(1), - Example: " conduit pipelines init awesome-pipeline-name --source postgres --destination kafka --path pipelines/pg-to-kafka.yaml", - RunE: func(cmd *cobra.Command, args []string) error { - if len(args) > 0 { - pipelinesInitArgs.Name = args[0] - } - return NewPipelinesInit(pipelinesInitArgs).Run() - }, - } - - // Add flags to pipelines init command - pipelinesInitCmd.Flags().StringVar( - &pipelinesInitArgs.Source, - "source", - "", - "Source connector (any of the built-in connectors).", - ) - pipelinesInitCmd.Flags().StringVar( - &pipelinesInitArgs.Destination, - "destination", - "", - "Destination connector (any of the built-in connectors).", - ) - pipelinesInitCmd.Flags().StringVar( - &pipelinesInitArgs.Path, - "pipelines.path", - "./pipelines", - "Path where the pipeline will be saved.", - ) - - return pipelinesInitCmd -} diff --git a/cmd/cli/cli_test.go b/cmd/cli/cli_test.go deleted file mode 100644 index 7bca0b0d1..000000000 --- a/cmd/cli/cli_test.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cli - -import ( - "bytes" - "strings" - "testing" - - "github.com/conduitio/conduit/pkg/conduit" -) - -func TestBuildRootCmd_HelpOutput(t *testing.T) { - cmd := buildRootCmd() - - var buf bytes.Buffer - cmd.SetOut(&buf) - cmd.SetArgs([]string{"--help"}) - - err := cmd.Execute() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - output := buf.String() - - expectedFlags := []string{ - "db.type", - "db.badger.path", - "db.postgres.connection-string", - "db.postgres.table", - "db.sqlite.path", - "db.sqlite.table", - "dev.blockprofileblockprofile", - "dev.cpuprofile", - "dev.memprofile", - "api.enabled", - "http.address", - "grpc.address", - "log.level", - "log.format", - "connectors.path", - "processors.path", - "pipelines.path", - "pipelines.exit-on-degraded", - "pipelines.error-recovery.min-delay", - "pipelines.error-recovery.max-delay", - "pipelines.error-recovery.backoff-factor", - "pipelines.error-recovery.max-retries", - "pipelines.error-recovery.max-retries-window", - "schema-registry.type", - "schema-registry.confluent.connection-string", - "preview.pipeline-arch-v2", - } - - unexpectedFlags := []string{ - conduit.FlagPipelinesExitOnError, //nolint:staticcheck // this will be completely removed before Conduit 1.0 - } - - for _, flag := range expectedFlags { - if !strings.Contains(output, flag) { - t.Errorf("expected flag %q not found in help output", flag) - } - } - - for _, flag := range unexpectedFlags { - if strings.Contains(output, flag) { - t.Errorf("unexpected flag %q found in help output", flag) - } - } -} diff --git a/cmd/cli/conduit_init.go b/cmd/cli/conduit_init.go deleted file mode 100644 index 40ece5351..000000000 --- a/cmd/cli/conduit_init.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cli - -import ( - "flag" - "fmt" - "os" - "path/filepath" - - "github.com/conduitio/conduit/cmd/cli/internal" - "github.com/conduitio/conduit/pkg/conduit" - "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/conduitio/yaml/v3" -) - -type InitArgs struct { - Path string -} - -type ConduitInit struct { - args InitArgs -} - -func NewConduitInit(args InitArgs) *ConduitInit { - return &ConduitInit{args: args} -} - -func (i *ConduitInit) Run() error { - err := i.createDirs() - if err != nil { - return err - } - - err = i.createConfigYAML() - if err != nil { - return fmt.Errorf("failed to create config YAML: %w", err) - } - - fmt.Println(` -Conduit has been initialized! - -To quickly create an example pipeline, run 'conduit pipelines init'. -To see how you can customize your first pipeline, run 'conduit pipelines init --help'.`) - - return nil -} - -func (i *ConduitInit) createConfigYAML() error { - cfgYAML := internal.NewYAMLTree() - i.conduitCfgFlags().VisitAll(func(f *flag.Flag) { - if conduit.HiddenFlags[f.Name] { - return // hide flag from output - } - cfgYAML.Insert(f.Name, f.DefValue, f.Usage) - }) - - yamlData, err := yaml.Marshal(cfgYAML.Root) - if err != nil { - return cerrors.Errorf("error marshaling YAML: %w\n", err) - } - - path := filepath.Join(i.path(), "conduit.yaml") - err = os.WriteFile(path, yamlData, 0o600) - if err != nil { - return cerrors.Errorf("error writing conduit.yaml: %w", err) - } - fmt.Printf("Configuration file written to %v\n", path) - - return nil -} - -func (i *ConduitInit) createDirs() error { - dirs := []string{"processors", "connectors", "pipelines"} - - for _, dir := range dirs { - path := filepath.Join(i.path(), dir) - - // Attempt to create the directory, skipping if it already exists - if err := os.Mkdir(path, os.ModePerm); err != nil { - if os.IsExist(err) { - fmt.Printf("Directory '%s' already exists, skipping...\n", path) - continue - } - return fmt.Errorf("failed to create directory '%s': %w", path, err) - } - - fmt.Printf("Created directory: %s\n", path) - } - - return nil -} - -func (i *ConduitInit) conduitCfgFlags() *flag.FlagSet { - cfg := conduit.DefaultConfigWithBasePath(i.path()) - return conduit.Flags(&cfg) -} - -func (i *ConduitInit) path() string { - if i.args.Path != "" { - return i.args.Path - } - - path, err := os.Getwd() - if err != nil { - panic(cerrors.Errorf("failed to get current working directory: %w", err)) - } - - return path -} diff --git a/cmd/cli/pipelines_init.go b/cmd/cli/pipelines_init.go deleted file mode 100644 index 8b27c420b..000000000 --- a/cmd/cli/pipelines_init.go +++ /dev/null @@ -1,354 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cli - -import ( - _ "embed" - "fmt" - "log" - "os" - "path/filepath" - "strings" - "text/template" - - "github.com/conduitio/conduit-commons/config" - "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/conduitio/conduit/pkg/plugin" - "github.com/conduitio/conduit/pkg/plugin/connector/builtin" -) - -//go:embed pipeline.tmpl -var pipelineCfgTmpl string - -var funcMap = template.FuncMap{ - "formatParameterValueTable": formatParameterValueTable, - "formatParameterValueYAML": formatParameterValueYAML, - "formatParameterDescriptionYAML": formatParameterDescriptionYAML, - "formatParameterRequired": formatParameterRequired, -} - -func formatParameterRequired(param config.Parameter) string { - for _, v := range param.Validations { - if v.Type() == config.ValidationTypeRequired { - return "Required" - } - } - - return "Optional" -} - -// formatParameterValue formats the value of a configuration parameter. -func formatParameterValueTable(value string) string { - switch { - case value == "": - return `` - case strings.Contains(value, "\n"): - // specifically used in the javascript processor - return fmt.Sprintf("\n```js\n%s\n```\n", value) - default: - return fmt.Sprintf("`%s`", value) - } -} - -func formatParameterDescriptionYAML(description string) string { - const ( - indentLen = 10 - prefix = "# " - lineLen = 80 - tmpNewLine = "〠" - ) - - // remove markdown new lines - description = strings.ReplaceAll(description, "\n\n", tmpNewLine) - description = strings.ReplaceAll(description, "\n", " ") - description = strings.ReplaceAll(description, tmpNewLine, "\n") - - formattedDescription := formatMultiline(description, strings.Repeat(" ", indentLen)+prefix, lineLen) - // remove first indent and last new line - formattedDescription = formattedDescription[indentLen : len(formattedDescription)-1] - return formattedDescription -} - -func formatMultiline( - input string, - prefix string, - maxLineLen int, -) string { - textLen := maxLineLen - len(prefix) - - // split the input into lines of length textLen - lines := strings.Split(input, "\n") - var formattedLines []string - for _, line := range lines { - if len(line) <= textLen { - formattedLines = append(formattedLines, line) - continue - } - - // split the line into multiple lines, don't break words - words := strings.Fields(line) - var formattedLine string - for _, word := range words { - if len(formattedLine)+len(word) > textLen { - formattedLines = append(formattedLines, formattedLine[1:]) - formattedLine = "" - } - formattedLine += " " + word - } - if formattedLine != "" { - formattedLines = append(formattedLines, formattedLine[1:]) - } - } - - // combine lines including indent and prefix - var formatted string - for _, line := range formattedLines { - formatted += prefix + line + "\n" - } - - return formatted -} - -func formatParameterValueYAML(value string) string { - switch { - case value == "": - return `""` - case strings.Contains(value, "\n"): - // specifically used in the javascript processor - formattedValue := formatMultiline(value, " ", 10000) - return fmt.Sprintf("|\n%s", formattedValue) - default: - return fmt.Sprintf(`'%s'`, value) - } -} - -const ( - defaultDestination = "file" - defaultSource = "generator" -) - -type pipelineTemplate struct { - Name string - SourceSpec connectorTemplate - DestinationSpec connectorTemplate -} - -type connectorTemplate struct { - Name string - Params config.Parameters -} - -type PipelinesInitArgs struct { - Name string - Source string - Destination string - Path string -} - -type PipelinesInit struct { - args PipelinesInitArgs -} - -func NewPipelinesInit(args PipelinesInitArgs) *PipelinesInit { - return &PipelinesInit{args: args} -} - -func (pi *PipelinesInit) Run() error { - var pipeline pipelineTemplate - // if no source/destination arguments are provided, - // we build a runnable example pipeline - if pi.args.Source == "" && pi.args.Destination == "" { - pipeline = pi.buildDemoPipeline() - } else { - p, err := pi.buildTemplatePipeline() - if err != nil { - return err - } - pipeline = p - } - - err := pi.write(pipeline) - if err != nil { - return cerrors.Errorf("could not write pipeline: %w", err) - } - - fmt.Printf(`Your pipeline has been initialized and created at %s. - -To run the pipeline, simply run 'conduit'.`, pi.configFilePath()) - - return nil -} - -func (pi *PipelinesInit) buildTemplatePipeline() (pipelineTemplate, error) { - srcParams, err := pi.getSourceParams() - if err != nil { - return pipelineTemplate{}, cerrors.Errorf("failed getting source params: %w", err) - } - - dstParams, err := pi.getDestinationParams() - if err != nil { - return pipelineTemplate{}, cerrors.Errorf("failed getting destination params: %w", err) - } - - return pipelineTemplate{ - Name: pi.pipelineName(), - SourceSpec: srcParams, - DestinationSpec: dstParams, - }, nil -} - -func (pi *PipelinesInit) buildDemoPipeline() pipelineTemplate { - srcParams, _ := pi.getSourceParams() - dstParams, _ := pi.getDestinationParams() - - return pipelineTemplate{ - Name: pi.pipelineName(), - SourceSpec: connectorTemplate{ - Name: defaultSource, - Params: map[string]config.Parameter{ - "format.type": { - Description: srcParams.Params["format.type"].Description, - Type: srcParams.Params["format.type"].Type, - Default: "structured", - Validations: srcParams.Params["format.type"].Validations, - }, - "format.options.scheduledDeparture": { - Description: "Generate field 'scheduledDeparture' of type 'time'", - Type: config.ParameterTypeString, - Default: "time", - }, - "format.options.airline": { - Description: "Generate field 'airline' of type string", - Type: config.ParameterTypeString, - Default: "string", - }, - "rate": { - Description: srcParams.Params["rate"].Description, - Type: srcParams.Params["rate"].Type, - Default: "1", - }, - }, - }, - DestinationSpec: connectorTemplate{ - Name: defaultDestination, - Params: map[string]config.Parameter{ - "path": { - Description: dstParams.Params["path"].Description, - Type: dstParams.Params["path"].Type, - Default: "./destination.txt", - }, - }, - }, - } -} - -func (pi *PipelinesInit) getOutput() *os.File { - output, err := os.OpenFile(pi.configFilePath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) - if err != nil { - log.Fatalf("error: failed to open %s: %v", pi.args.Path, err) - } - - return output -} - -func (pi *PipelinesInit) write(pipeline pipelineTemplate) error { - t, err := template.New("").Funcs(funcMap).Option("missingkey=zero").Parse(pipelineCfgTmpl) - if err != nil { - return cerrors.Errorf("failed parsing template: %w", err) - } - - output := pi.getOutput() - defer output.Close() - - err = t.Execute(output, pipeline) - if err != nil { - return cerrors.Errorf("failed executing template: %w", err) - } - - return nil -} - -func (pi *PipelinesInit) getSourceParams() (connectorTemplate, error) { - for _, conn := range builtin.DefaultBuiltinConnectors { - specs := conn.NewSpecification() - if specs.Name == pi.sourceConnector() || specs.Name == "builtin:"+pi.sourceConnector() { - if conn.NewSource == nil { - return connectorTemplate{}, cerrors.Errorf("plugin %v has no source", pi.sourceConnector()) - } - - return connectorTemplate{ - Name: specs.Name, - Params: conn.NewSource().Parameters(), - }, nil - } - } - - return connectorTemplate{}, cerrors.Errorf("%v: %w", pi.sourceConnector(), plugin.ErrPluginNotFound) -} - -func (pi *PipelinesInit) getDestinationParams() (connectorTemplate, error) { - for _, conn := range builtin.DefaultBuiltinConnectors { - specs := conn.NewSpecification() - if specs.Name == pi.destinationConnector() || specs.Name == "builtin:"+pi.destinationConnector() { - if conn.NewDestination == nil { - return connectorTemplate{}, cerrors.Errorf("plugin %v has no source", pi.destinationConnector()) - } - - return connectorTemplate{ - Name: specs.Name, - Params: conn.NewDestination().Parameters(), - }, nil - } - } - - return connectorTemplate{}, cerrors.Errorf("%v: %w", pi.destinationConnector(), plugin.ErrPluginNotFound) -} - -func (pi *PipelinesInit) configFilePath() string { - path := pi.args.Path - if path == "" { - path = "./pipelines" - } - - return filepath.Join(path, pi.configFileName()) -} - -func (pi *PipelinesInit) configFileName() string { - return fmt.Sprintf("pipeline-%s.yaml", pi.pipelineName()) -} - -func (pi *PipelinesInit) sourceConnector() string { - if pi.args.Source != "" { - return pi.args.Source - } - - return defaultSource -} - -func (pi *PipelinesInit) destinationConnector() string { - if pi.args.Destination != "" { - return pi.args.Destination - } - - return defaultDestination -} - -func (pi *PipelinesInit) pipelineName() string { - if pi.args.Name != "" { - return pi.args.Name - } - - return fmt.Sprintf("%s-to-%s", pi.sourceConnector(), pi.destinationConnector()) -} diff --git a/cmd/cli/internal/yaml.go b/cmd/conduit/internal/yaml.go similarity index 100% rename from cmd/cli/internal/yaml.go rename to cmd/conduit/internal/yaml.go diff --git a/cmd/conduit/main.go b/cmd/conduit/main.go index a744d7f3e..0a28295a6 100644 --- a/cmd/conduit/main.go +++ b/cmd/conduit/main.go @@ -15,9 +15,22 @@ package main import ( - "github.com/conduitio/conduit/cmd/cli" + "fmt" + "os" + + "github.com/conduitio/conduit/cmd/conduit/root" + "github.com/conduitio/ecdysis" ) func main() { - cli.New().Run() + e := ecdysis.New() + + cmd := e.MustBuildCobraCommand(&root.RootCommand{}) + cmd.CompletionOptions.DisableDefaultCmd = true + + if err := cmd.Execute(); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + os.Exit(0) } diff --git a/cmd/conduit/root/init.go b/cmd/conduit/root/init.go new file mode 100644 index 000000000..365d61f12 --- /dev/null +++ b/cmd/conduit/root/init.go @@ -0,0 +1,147 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "context" + "fmt" + "os" + "path/filepath" + "reflect" + + "github.com/conduitio/conduit/cmd/conduit/internal" + "github.com/conduitio/conduit/pkg/conduit" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/ecdysis" + "github.com/conduitio/yaml/v3" +) + +var ( + _ ecdysis.CommandWithExecute = (*InitCommand)(nil) + _ ecdysis.CommandWithDocs = (*InitCommand)(nil) +) + +type InitCommand struct { + cfg *conduit.Config + rootFlags *RootFlags +} + +func (c *InitCommand) Usage() string { return "init" } + +func (c *InitCommand) Docs() ecdysis.Docs { + return ecdysis.Docs{ + Short: `Initialize Conduit with a configuration file and directories.`, + } +} + +func (c *InitCommand) createDirs() error { + // These could be used based on the root flags if those were global + dirs := []string{"processors", "connectors", "pipelines"} + conduitPath := filepath.Dir(c.rootFlags.ConduitConfigPath) + + for _, dir := range dirs { + path := filepath.Join(conduitPath, dir) + + // Attempt to create the directory, skipping if it already exists + if err := os.Mkdir(path, os.ModePerm); err != nil { + if os.IsExist(err) { + fmt.Printf("Directory '%s' already exists, skipping...\n", path) + continue + } + return fmt.Errorf("failed to create directory '%s': %w", path, err) + } + + fmt.Printf("Created directory: %s\n", path) + } + + return nil +} + +func (c *InitCommand) createConfigYAML() error { + cfgYAML := internal.NewYAMLTree() + + v := reflect.Indirect(reflect.ValueOf(c.cfg)) + t := v.Type() + + for i := 0; i < v.NumField(); i++ { + field := t.Field(i) + fieldValue := v.Field(i) + + if fieldValue.Kind() == reflect.Struct { + embedStructYAML(fieldValue, field, cfgYAML) + } else { + value := fmt.Sprintf("%v", fieldValue.Interface()) + usage := field.Tag.Get("usage") + longName := field.Tag.Get("long") + + if longName != "" { + cfgYAML.Insert(longName, value, usage) + } + } + } + + yamlData, err := yaml.Marshal(cfgYAML.Root) + if err != nil { + return cerrors.Errorf("error marshaling YAML: %w\n", err) + } + + err = os.WriteFile(c.rootFlags.ConduitConfigPath, yamlData, 0o600) + if err != nil { + return cerrors.Errorf("error writing conduit.yaml: %w", err) + } + fmt.Printf("Configuration file written to %v\n", c.rootFlags.ConduitConfigPath) + + return nil +} + +func embedStructYAML(v reflect.Value, field reflect.StructField, cfgYAML *internal.YAMLTree) { + t := v.Type() + for i := 0; i < v.NumField(); i++ { + subField := t.Field(i) + subFieldValue := v.Field(i) + + if subFieldValue.Kind() == reflect.Struct { + embedStructYAML(subFieldValue, subField, cfgYAML) + } else { + value := fmt.Sprintf("%v", subFieldValue.Interface()) + usage := subField.Tag.Get("usage") + longName := subField.Tag.Get("long") + + if longName != "" { + cfgYAML.Insert(longName, value, usage) + } + } + } +} + +func (c *InitCommand) Execute(ctx context.Context) error { + err := c.createDirs() + if err != nil { + return err + } + + err = c.createConfigYAML() + if err != nil { + return fmt.Errorf("failed to create config YAML: %w", err) + } + + fmt.Println(` + Conduit has been initialized! + + To quickly create an example pipeline, run 'conduit pipelines init'. + To see how you can customize your first pipeline, run 'conduit pipelines init --help'.`) + + return nil +} diff --git a/cmd/conduit/root/pipelines/format_parameter.go b/cmd/conduit/root/pipelines/format_parameter.go new file mode 100644 index 000000000..38c5e7d33 --- /dev/null +++ b/cmd/conduit/root/pipelines/format_parameter.go @@ -0,0 +1,125 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipelines + +import ( + "fmt" + "strings" + "text/template" + + "github.com/conduitio/conduit-commons/config" +) + +var funcMap = template.FuncMap{ + "formatParameterValueTable": formatParameterValueTable, + "formatParameterValueYAML": formatParameterValueYAML, + "formatParameterDescriptionYAML": formatParameterDescriptionYAML, + "formatParameterRequired": formatParameterRequired, +} + +func formatParameterRequired(param config.Parameter) string { + for _, v := range param.Validations { + if v.Type() == config.ValidationTypeRequired { + return "Required" + } + } + + return "Optional" +} + +// formatParameterValue formats the value of a configuration parameter. +func formatParameterValueTable(value string) string { + switch { + case value == "": + return `` + case strings.Contains(value, "\n"): + // specifically used in the javascript processor + return fmt.Sprintf("\n```js\n%s\n```\n", value) + default: + return fmt.Sprintf("`%s`", value) + } +} + +func formatParameterDescriptionYAML(description string) string { + const ( + indentLen = 10 + prefix = "# " + lineLen = 80 + tmpNewLine = "〠" + ) + + // remove markdown new lines + description = strings.ReplaceAll(description, "\n\n", tmpNewLine) + description = strings.ReplaceAll(description, "\n", " ") + description = strings.ReplaceAll(description, tmpNewLine, "\n") + + formattedDescription := formatMultiline(description, strings.Repeat(" ", indentLen)+prefix, lineLen) + // remove first indent and last new line + formattedDescription = formattedDescription[indentLen : len(formattedDescription)-1] + return formattedDescription +} + +func formatMultiline( + input string, + prefix string, + maxLineLen int, +) string { + textLen := maxLineLen - len(prefix) + + // split the input into lines of length textLen + lines := strings.Split(input, "\n") + var formattedLines []string + for _, line := range lines { + if len(line) <= textLen { + formattedLines = append(formattedLines, line) + continue + } + + // split the line into multiple lines, don't break words + words := strings.Fields(line) + var formattedLine string + for _, word := range words { + if len(formattedLine)+len(word) > textLen { + formattedLines = append(formattedLines, formattedLine[1:]) + formattedLine = "" + } + formattedLine += " " + word + } + if formattedLine != "" { + formattedLines = append(formattedLines, formattedLine[1:]) + } + } + + // combine lines including indent and prefix + var formatted string + for _, line := range formattedLines { + formatted += prefix + line + "\n" + } + + return formatted +} + +func formatParameterValueYAML(value string) string { + switch { + case value == "": + return `""` + case strings.Contains(value, "\n"): + // specifically used in the javascript processor + formattedValue := formatMultiline(value, " ", 10000) + return fmt.Sprintf("|\n%s", formattedValue) + default: + return fmt.Sprintf(`'%s'`, value) + } +} diff --git a/cmd/conduit/root/pipelines/init.go b/cmd/conduit/root/pipelines/init.go new file mode 100644 index 000000000..9c59be9a8 --- /dev/null +++ b/cmd/conduit/root/pipelines/init.go @@ -0,0 +1,253 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipelines + +import ( + "context" + _ "embed" + "fmt" + "log" + "os" + "path/filepath" + "text/template" + + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/plugin" + "github.com/conduitio/conduit/pkg/plugin/connector/builtin" + "github.com/conduitio/ecdysis" +) + +var ( + _ ecdysis.CommandWithDocs = (*InitCommand)(nil) + _ ecdysis.CommandWithFlags = (*InitCommand)(nil) + _ ecdysis.CommandWithArgs = (*InitCommand)(nil) + _ ecdysis.CommandWithExecute = (*InitCommand)(nil) + + //go:embed pipeline.tmpl + pipelineCfgTmpl string +) + +const ( + defaultSource = "generator" + defaultDestination = "file" +) + +type InitArgs struct { + name string +} + +type InitFlags struct { + Source string `long:"source" usage:"Source connector (any of the built-in connectors)." default:"generator"` + Destination string `long:"destination" usage:"Destination connector (any of the built-in connectors)." default:"file"` + PipelinesPath string `long:"pipelines.path" usage:"Path where the pipeline will be saved." default:"./pipelines"` +} + +type InitCommand struct { + args InitArgs + flags InitFlags + configFilePath string +} + +func (c *InitCommand) Flags() []ecdysis.Flag { + flags := ecdysis.BuildFlags(&c.flags) + + currentPath, err := os.Getwd() + if err != nil { + panic(cerrors.Errorf("failed to get current working directory: %w", err)) + } + + flags.SetDefault("pipelines.path", filepath.Join(currentPath, "./pipelines")) + flags.SetDefault("source", defaultSource) + flags.SetDefault("destination", defaultDestination) + + return flags +} + +func (c *InitCommand) Args(args []string) error { + if len(args) == 0 { + return cerrors.Errorf("requires a pipeline name") + } + + if len(args) > 1 { + return cerrors.Errorf("too many arguments") + } + c.args.name = args[0] + return nil +} + +func (c *InitCommand) Usage() string { return "init" } + +func (c *InitCommand) Docs() ecdysis.Docs { + return ecdysis.Docs{ + Short: "Initialize an example pipeline.", + Long: `Initialize a pipeline configuration file, with all of parameters for source and destination connectors +initialized and described. The source and destination connector can be chosen via flags. If no connectors are chosen, then +a simple and runnable generator-to-log pipeline is configured.`, + Example: "conduit pipelines init awesome-pipeline-name --source postgres --destination kafka --pipelines.path pipelines/pg-to-kafka.yaml", + } +} + +func (c *InitCommand) getSourceSpec() (connectorSpec, error) { + for _, conn := range builtin.DefaultBuiltinConnectors { + specs := conn.NewSpecification() + if specs.Name == c.flags.Source || specs.Name == "builtin:"+c.flags.Source { + if conn.NewSource == nil { + return connectorSpec{}, cerrors.Errorf("plugin %v has no source", c.flags.Source) + } + + return connectorSpec{ + Name: specs.Name, + Params: conn.NewSource().Parameters(), + }, nil + } + } + + return connectorSpec{}, cerrors.Errorf("%v: %w", c.flags.Source, plugin.ErrPluginNotFound) +} + +func (c *InitCommand) getDestinationSpec() (connectorSpec, error) { + for _, conn := range builtin.DefaultBuiltinConnectors { + specs := conn.NewSpecification() + if specs.Name == c.flags.Destination || specs.Name == "builtin:"+c.flags.Destination { + if conn.NewDestination == nil { + return connectorSpec{}, cerrors.Errorf("plugin %v has no source", c.flags.Destination) + } + + return connectorSpec{ + Name: specs.Name, + Params: conn.NewDestination().Parameters(), + }, nil + } + } + return connectorSpec{}, cerrors.Errorf("%v: %w", c.flags.Destination, plugin.ErrPluginNotFound) +} + +// getDemoSourceGeneratorSpec returns a simplified version of the source generator connector. +func (c *InitCommand) getDemoSourceGeneratorSpec(spec connectorSpec) connectorSpec { + return connectorSpec{ + Name: defaultSource, + Params: map[string]config.Parameter{ + "format.type": { + Description: spec.Params["format.type"].Description, + Type: spec.Params["format.type"].Type, + Default: "structured", + Validations: spec.Params["format.type"].Validations, + }, + "format.options.scheduledDeparture": { + Description: "Generate field 'scheduledDeparture' of type 'time'", + Type: config.ParameterTypeString, + Default: "time", + }, + "format.options.airline": { + Description: "Generate field 'airline' of type string", + Type: config.ParameterTypeString, + Default: "string", + }, + "rate": { + Description: spec.Params["rate"].Description, + Type: spec.Params["rate"].Type, + Default: "1", + }, + }, + } +} + +// getDemoDestinationFileSpec returns a simplified version of the destination file connector. +func (c *InitCommand) getDemoDestinationFileSpec(spec connectorSpec) connectorSpec { + return connectorSpec{ + Name: defaultDestination, + Params: map[string]config.Parameter{ + "path": { + Description: spec.Params["path"].Description, + Type: spec.Params["path"].Type, + Default: "./destination.txt", + }, + }, + } +} + +func (c *InitCommand) buildTemplatePipeline() (pipelineTemplate, error) { + srcSpec, err := c.getSourceSpec() + if err != nil { + return pipelineTemplate{}, cerrors.Errorf("failed getting source params: %w", err) + } + + // provide a simplified version + if c.flags.Source == defaultSource { + srcSpec = c.getDemoSourceGeneratorSpec(srcSpec) + } + + dstSpec, err := c.getDestinationSpec() + if err != nil { + return pipelineTemplate{}, cerrors.Errorf("failed getting destination params: %w", err) + } + + // provide a simplified version + if c.flags.Destination == defaultDestination { + dstSpec = c.getDemoDestinationFileSpec(dstSpec) + } + + return pipelineTemplate{ + Name: c.args.name, + SourceSpec: srcSpec, + DestinationSpec: dstSpec, + }, nil +} + +func (c *InitCommand) getOutput() *os.File { + output, err := os.OpenFile(c.configFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + log.Fatalf("error: failed to open %s: %v", c.configFilePath, err) + } + + return output +} + +func (c *InitCommand) write(pipeline pipelineTemplate) error { + t, err := template.New("").Funcs(funcMap).Option("missingkey=zero").Parse(pipelineCfgTmpl) + if err != nil { + return cerrors.Errorf("failed parsing template: %w", err) + } + + output := c.getOutput() + defer output.Close() + + err = t.Execute(output, pipeline) + if err != nil { + return cerrors.Errorf("failed executing template: %w", err) + } + + return nil +} + +func (c *InitCommand) Execute(_ context.Context) error { + c.configFilePath = filepath.Join(c.flags.PipelinesPath, fmt.Sprintf("pipeline-%s.yaml", c.args.name)) + + pipeline, err := c.buildTemplatePipeline() + if err != nil { + return err + } + + if err := c.write(pipeline); err != nil { + return cerrors.Errorf("could not write pipeline: %w", err) + } + + fmt.Printf(`Your pipeline has been initialized and created at %s. + +To run the pipeline, simply run 'conduit'.`, c.configFilePath) + + return nil +} diff --git a/cmd/cli/pipeline.tmpl b/cmd/conduit/root/pipelines/pipeline.tmpl similarity index 100% rename from cmd/cli/pipeline.tmpl rename to cmd/conduit/root/pipelines/pipeline.tmpl diff --git a/cmd/conduit/root/pipelines/pipelines.go b/cmd/conduit/root/pipelines/pipelines.go new file mode 100644 index 000000000..62dbcc91d --- /dev/null +++ b/cmd/conduit/root/pipelines/pipelines.go @@ -0,0 +1,40 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipelines + +import ( + "github.com/conduitio/ecdysis" +) + +var ( + _ ecdysis.CommandWithDocs = (*PipelinesCommand)(nil) + _ ecdysis.CommandWithSubCommands = (*PipelinesCommand)(nil) +) + +type PipelinesCommand struct{} + +func (c *PipelinesCommand) SubCommands() []ecdysis.Command { + return []ecdysis.Command{ + &InitCommand{}, + } +} + +func (c *PipelinesCommand) Usage() string { return "pipelines" } + +func (c *PipelinesCommand) Docs() ecdysis.Docs { + return ecdysis.Docs{ + Short: "Initialize and manage pipelines", + } +} diff --git a/cmd/conduit/root/pipelines/template.go b/cmd/conduit/root/pipelines/template.go new file mode 100644 index 000000000..d7a707309 --- /dev/null +++ b/cmd/conduit/root/pipelines/template.go @@ -0,0 +1,28 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipelines + +import "github.com/conduitio/conduit-commons/config" + +type connectorSpec struct { + Name string + Params config.Parameters +} + +type pipelineTemplate struct { + Name string + SourceSpec connectorSpec + DestinationSpec connectorSpec +} diff --git a/cmd/conduit/root/root.go b/cmd/conduit/root/root.go new file mode 100644 index 000000000..f68c9a58e --- /dev/null +++ b/cmd/conduit/root/root.go @@ -0,0 +1,193 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/conduitio/conduit/cmd/conduit/root/pipelines" + "github.com/conduitio/conduit/pkg/conduit" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/ecdysis" + "github.com/spf13/viper" +) + +var ( + _ ecdysis.CommandWithFlags = (*RootCommand)(nil) + _ ecdysis.CommandWithExecute = (*RootCommand)(nil) + _ ecdysis.CommandWithDocs = (*RootCommand)(nil) + _ ecdysis.CommandWithSubCommands = (*RootCommand)(nil) +) + +const ConduitPrefix = "CONDUIT" + +type RootFlags struct { + // Global flags ----------------------------------------------------------- + + // Conduit configuration file + ConduitConfigPath string `long:"config.path" usage:"global conduit configuration file" persistent:"true" default:"./conduit.yaml"` + + // Version + Version bool `long:"version" short:"v" usage:"show current Conduit version" persistent:"true"` + + conduit.Config +} + +type RootCommand struct { + flags RootFlags + cfg conduit.Config +} + +func (c *RootCommand) updateConfig() error { + v := viper.New() + + // Set default values + v.SetDefault("config.path", c.flags.ConduitConfigPath) + v.SetDefault("db.type", c.flags.DB.Type) + v.SetDefault("api.enabled", c.flags.API.Enabled) + v.SetDefault("log.level", c.flags.Log.Level) + v.SetDefault("log.format", c.flags.Log.Format) + v.SetDefault("connectors.path", c.flags.Connectors.Path) + v.SetDefault("processors.path", c.flags.Processors.Path) + v.SetDefault("pipelines.path", c.flags.Pipelines.Path) + v.SetDefault("pipelines.exit-on-degraded", c.flags.Pipelines.ExitOnDegraded) + v.SetDefault("pipelines.error-recovery.min-delay", c.flags.Pipelines.ErrorRecovery.MinDelay) + v.SetDefault("pipelines.error-recovery.max-delay", c.flags.Pipelines.ErrorRecovery.MaxDelay) + v.SetDefault("pipelines.error-recovery.backoff-factor", c.flags.Pipelines.ErrorRecovery.BackoffFactor) + v.SetDefault("pipelines.error-recovery.max-retries", c.flags.Pipelines.ErrorRecovery.MaxRetries) + v.SetDefault("pipelines.error-recovery.max-retries-window", c.flags.Pipelines.ErrorRecovery.MaxRetriesWindow) + v.SetDefault("schema-registry.type", c.flags.SchemaRegistry.Type) + v.SetDefault("schema-registry.confluent.connection-string", c.flags.SchemaRegistry.Confluent.ConnectionString) + v.SetDefault("preview.pipeline-arch-v2", c.flags.Preview.PipelineArchV2) + v.SetDefault("dev.cpuprofile", c.flags.Dev.CPUProfile) + v.SetDefault("dev.memprofile", c.flags.Dev.MemProfile) + v.SetDefault("dev.blockprofile", c.flags.Dev.BlockProfile) + + // Read configuration from file + v.SetConfigFile(c.flags.ConduitConfigPath) + + // ignore if file doesn't exist. Maybe we could check if user is trying read from a file that doesn't exist. + _ = v.ReadInConfig() + + // Set environment variable prefix and automatic mapping + v.SetEnvPrefix(ConduitPrefix) + v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + v.AutomaticEnv() + + v.BindEnv("db.type") + v.BindEnv("db.badger.path") + v.BindEnv("db.postgres.connection-string") + v.BindEnv("db.postgres.table") + v.BindEnv("db.sqlite.path") + v.BindEnv("db.sqlite.table") + v.BindEnv("api.enabled") + v.BindEnv("http.address") + v.BindEnv("grpc.address") + v.BindEnv("log.level") + v.BindEnv("log.format") + v.BindEnv("connectors.path") + v.BindEnv("processors.path") + v.BindEnv("pipelines.path") + v.BindEnv("pipelines.exit-on-degraded") + v.BindEnv("pipelines.error-recovery.min-delay") + v.BindEnv("pipelines.error-recovery.max-delay") + v.BindEnv("pipelines.error-recovery.backoff-factor") + v.BindEnv("pipelines.error-recovery.max-retries") + v.BindEnv("pipelines.error-recovery.max-retries-window") + v.BindEnv("schema-registry.type") + v.BindEnv("schema-registry.confluent.connection-string") + v.BindEnv("preview.pipeline-arch-v2") + v.BindEnv("dev.cpuprofile") + v.BindEnv("dev.memprofile") + v.BindEnv("dev.blockprofile") + + if err := v.Unmarshal(&c.cfg); err != nil { + return fmt.Errorf("unable to unmarshal the configuration: %w", err) + } + + return nil +} + +func (c *RootCommand) Execute(_ context.Context) error { + if c.flags.Version { + _, _ = fmt.Fprintf(os.Stdout, "%s\n", conduit.Version(true)) + return nil + } + + if err := c.updateConfig(); err != nil { + return err + } + + e := &conduit.Entrypoint{} + e.Serve(c.cfg) + return nil +} + +func (c *RootCommand) Usage() string { return "conduit" } +func (c *RootCommand) Flags() []ecdysis.Flag { + flags := ecdysis.BuildFlags(&c.flags) + + currentPath, err := os.Getwd() + if err != nil { + panic(cerrors.Errorf("failed to get current working directory: %w", err)) + } + c.cfg = conduit.DefaultConfigWithBasePath(currentPath) + + conduitConfigPath := filepath.Join(currentPath, "conduit.yaml") + flags.SetDefault("config.path", conduitConfigPath) + flags.SetDefault("db.type", c.cfg.DB.Type) + flags.SetDefault("db.badger.path", c.cfg.DB.Badger.Path) + flags.SetDefault("db.postgres.connection-string", c.cfg.DB.Postgres.ConnectionString) + flags.SetDefault("db.postgres.table", c.cfg.DB.Postgres.Table) + flags.SetDefault("db.sqlite.path", c.cfg.DB.SQLite.Path) + flags.SetDefault("db.sqlite.table", c.cfg.DB.SQLite.Table) + flags.SetDefault("api.enabled", c.cfg.API.Enabled) + flags.SetDefault("http.address", c.cfg.API.HTTP.Address) + flags.SetDefault("grpc.address", c.cfg.API.GRPC.Address) + flags.SetDefault("log.level", c.cfg.Log.Level) + flags.SetDefault("log.format", c.cfg.Log.Format) + flags.SetDefault("connectors.path", c.cfg.Connectors.Path) + flags.SetDefault("processors.path", c.cfg.Processors.Path) + flags.SetDefault("pipelines.path", c.cfg.Pipelines.Path) + flags.SetDefault("pipelines.exit-on-degraded", c.cfg.Pipelines.ExitOnDegraded) + flags.SetDefault("pipelines.error-recovery.min-delay", c.cfg.Pipelines.ErrorRecovery.MinDelay) + flags.SetDefault("pipelines.error-recovery.max-delay", c.cfg.Pipelines.ErrorRecovery.MaxDelay) + flags.SetDefault("pipelines.error-recovery.backoff-factor", c.cfg.Pipelines.ErrorRecovery.BackoffFactor) + flags.SetDefault("pipelines.error-recovery.max-retries", c.cfg.Pipelines.ErrorRecovery.MaxRetries) + flags.SetDefault("pipelines.error-recovery.max-retries-window", c.cfg.Pipelines.ErrorRecovery.MaxRetriesWindow) + flags.SetDefault("schema-registry.type", c.cfg.SchemaRegistry.Type) + flags.SetDefault("schema-registry.confluent.connection-string", c.cfg.SchemaRegistry.Confluent.ConnectionString) + flags.SetDefault("preview.pipeline-arch-v2", c.cfg.Preview.PipelineArchV2) + + return flags +} + +func (c *RootCommand) Docs() ecdysis.Docs { + return ecdysis.Docs{ + Short: "Conduit CLI", + Long: `Conduit CLI is a command-line that helps you interact with and manage Conduit.`, + } +} + +func (c *RootCommand) SubCommands() []ecdysis.Command { + return []ecdysis.Command{ + &InitCommand{cfg: &c.cfg, rootFlags: &c.flags}, + &pipelines.PipelinesCommand{}, + } +} diff --git a/cmd/conduit/root/root_test.go b/cmd/conduit/root/root_test.go new file mode 100644 index 000000000..4ad0127e9 --- /dev/null +++ b/cmd/conduit/root/root_test.go @@ -0,0 +1,245 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "os" + "path/filepath" + "testing" + + "github.com/conduitio/conduit/pkg/conduit" + "github.com/conduitio/ecdysis" + isT "github.com/matryer/is" +) + +func TestRootCommandFlags(t *testing.T) { + is := isT.New(t) + + expectedFlags := []struct { + longName string + shortName string + required bool + persistent bool + hidden bool + }{ + {longName: "config.path", persistent: true}, + {longName: "version", shortName: "v", persistent: true}, + {longName: "db.type"}, + {longName: "db.badger.path"}, + {longName: "db.postgres.connection-string"}, + {longName: "db.postgres.table"}, + {longName: "db.sqlite.path"}, + {longName: "db.sqlite.table"}, + {longName: "api.enabled"}, + {longName: "http.address"}, + {longName: "grpc.address"}, + {longName: "log.level"}, + {longName: "log.format"}, + {longName: "connectors.path"}, + {longName: "processors.path"}, + {longName: "pipelines.path"}, + {longName: "pipelines.exit-on-degraded"}, + {longName: "pipelines.error-recovery.min-delay"}, + {longName: "pipelines.error-recovery.max-delay"}, + {longName: "pipelines.error-recovery.backoff-factor"}, + {longName: "pipelines.error-recovery.max-retries"}, + {longName: "pipelines.error-recovery.max-retries-window"}, + {longName: "schema-registry.type"}, + {longName: "schema-registry.confluent.connection-string"}, + {longName: "preview.pipeline-arch-v2"}, + {longName: "dev.cpuprofile"}, + {longName: "dev.memprofile"}, + {longName: "dev.blockprofile"}, + } + + c := &RootCommand{} + flags := c.Flags() + + for _, ef := range expectedFlags { + var foundFlag *ecdysis.Flag + for _, f := range flags { + if f.Long == ef.longName { + foundFlag = &f + break + } + } + + is.True(foundFlag != nil) + + if foundFlag != nil { + is.Equal(ef.shortName, foundFlag.Short) + is.Equal(ef.required, foundFlag.Required) + is.Equal(ef.persistent, foundFlag.Persistent) + is.Equal(ef.hidden, foundFlag.Hidden) + } + } +} + +func TestRootCommand_updateConfig(t *testing.T) { + is := isT.New(t) + + tmpDir, err := os.MkdirTemp("", "config-test") + is.NoErr(err) + defer os.RemoveAll(tmpDir) + + configFileContent := ` +db: + type: "sqlite" + sqlite: + path: "/custom/path/db" +log: + level: "debug" + format: "json" +api: + enabled: false +` + + configPath := filepath.Join(tmpDir, "config.yaml") + err = os.WriteFile(configPath, []byte(configFileContent), 0644) + is.NoErr(err) + + defaultCfg := conduit.DefaultConfigWithBasePath(tmpDir) + + tests := []struct { + name string + flags *RootFlags + configFile string + envVars map[string]string + assertFunc func(*isT.I, conduit.Config) + }{ + { + name: "default values only", + flags: &RootFlags{ + ConduitConfigPath: "nonexistent.yaml", + Config: conduit.Config{ + DB: conduit.DefaultConfig().DB, + Log: conduit.DefaultConfig().Log, + API: conduit.DefaultConfig().API, + }, + }, + assertFunc: func(is *isT.I, cfg conduit.Config) { + is.Equal(cfg.DB.Type, defaultCfg.DB.Type) + is.Equal(cfg.Log.Level, defaultCfg.Log.Level) + is.Equal(cfg.API.Enabled, defaultCfg.API.Enabled) + }, + }, + { + name: "config file overrides defaults", + flags: &RootFlags{ + ConduitConfigPath: configPath, + }, + assertFunc: func(is *isT.I, cfg conduit.Config) { + is.Equal(cfg.DB.Type, "sqlite") + is.Equal(cfg.DB.SQLite.Path, "/custom/path/db") + is.Equal(cfg.Log.Level, "debug") + is.Equal(cfg.Log.Format, "json") + is.Equal(cfg.API.Enabled, false) + }, + }, + { + name: "env vars override config file", + flags: &RootFlags{ + ConduitConfigPath: configPath, + }, + envVars: map[string]string{ + "CONDUIT_DB_TYPE": "postgres", + "CONDUIT_LOG_LEVEL": "warn", + "CONDUIT_API_ENABLED": "true", + }, + assertFunc: func(is *isT.I, cfg conduit.Config) { + is.Equal(cfg.DB.Type, "postgres") + is.Equal(cfg.Log.Level, "warn") + is.Equal(cfg.API.Enabled, true) + // Config file values that weren't overridden should remain + is.Equal(cfg.Log.Format, "json") + }, + }, + { + name: "flags override everything", + flags: &RootFlags{ + ConduitConfigPath: configPath, + Config: conduit.Config{ + DB: conduit.ConfigDB{ + Type: "sqlite", + }, + Log: conduit.ConfigLog{ + Level: "error", + Format: "text", + }, + API: conduit.ConfigAPI{ + Enabled: true, + }, + }, + }, + envVars: map[string]string{ + "CONDUIT_DB_TYPE": "postgres", + "CONDUIT_LOG_LEVEL": "warn", + "CONDUIT_API_ENABLED": "false", + }, + assertFunc: func(is *isT.I, cfg conduit.Config) { + is.Equal(cfg.DB.Type, "sqlite") + is.Equal(cfg.Log.Level, "error") + is.Equal(cfg.Log.Format, "text") + is.Equal(cfg.API.Enabled, true) + }, + }, + { + name: "partial overrides", + flags: &RootFlags{ + ConduitConfigPath: configPath, + Config: conduit.Config{ + Log: conduit.ConfigLog{Level: "warn"}, + }, + }, + envVars: map[string]string{ + "CONDUIT_DB_TYPE": "postgres", + }, + assertFunc: func(is *isT.I, cfg conduit.Config) { + is.Equal(cfg.DB.Type, "postgres") + is.Equal(cfg.Log.Level, "warn") + is.Equal(cfg.Log.Format, "json") + is.Equal(cfg.API.Enabled, defaultCfg.API.Enabled) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + is := isT.New(t) + + os.Clearenv() + + for k, v := range tt.envVars { + os.Setenv(k, v) + } + defer func() { + for k := range tt.envVars { + os.Unsetenv(k) + } + }() + + c := &RootCommand{ + flags: *tt.flags, + } + + c.cfg = conduit.DefaultConfigWithBasePath(tmpDir) + + err := c.updateConfig() + is.NoErr(err) + + tt.assertFunc(is, c.cfg) + }) + } +} diff --git a/go.mod b/go.mod index 071aa3b80..84e359965 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/conduitio/conduit-connector-sdk v0.12.0 github.com/conduitio/conduit-processor-sdk v0.4.0 github.com/conduitio/conduit-schema-registry v0.2.2 + github.com/conduitio/ecdysis v0.0.0-20241104140515-1031f323f080 github.com/conduitio/yaml/v3 v3.3.0 github.com/dop251/goja v0.0.0-20240806095544-3491d4a58fbe github.com/dop251/goja_nodejs v0.0.0-20231122114759-e84d9a924c5c @@ -34,15 +35,13 @@ require ( github.com/jpillora/backoff v1.0.0 github.com/matryer/is v1.4.1 github.com/neilotoole/slogt v1.1.0 - github.com/peterbourgon/ff/v3 v3.4.0 github.com/piotrkowalczuk/promgrpc/v4 v4.1.4 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.60.1 github.com/rs/zerolog v1.33.0 github.com/sourcegraph/conc v0.3.0 - github.com/spf13/cobra v1.8.1 - github.com/spf13/pflag v1.0.5 + github.com/spf13/viper v1.19.0 github.com/stealthrocket/wazergo v0.19.1 github.com/tetratelabs/wazero v1.8.2 github.com/twmb/franz-go/pkg/sr v1.2.0 @@ -320,7 +319,8 @@ require ( github.com/sourcegraph/go-diff v0.7.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.7.0 // indirect - github.com/spf13/viper v1.19.0 // indirect + github.com/spf13/cobra v1.8.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect @@ -388,3 +388,5 @@ require ( mvdan.cc/unparam v0.0.0-20240528143540-8a5130ca722f // indirect pluginrpc.com/pluginrpc v0.5.0 // indirect ) + +replace github.com/conduitio/ecdysis => ../ecdysis diff --git a/go.sum b/go.sum index c2af8e074..b0f88218c 100644 --- a/go.sum +++ b/go.sum @@ -718,8 +718,6 @@ github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= -github.com/peterbourgon/ff/v3 v3.4.0 h1:QBvM/rizZM1cB0p0lGMdmR7HxZeI/ZrBWB4DqLkMUBc= -github.com/peterbourgon/ff/v3 v3.4.0/go.mod h1:zjJVUhx+twciwfDl0zBcFzl4dW8axCRyXE/eKY9RztQ= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index aba89f144..b9523d4fd 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -39,50 +39,53 @@ const ( SchemaRegistryTypeBuiltin = "builtin" ) -// Config holds all configurable values for Conduit. -type Config struct { - DB struct { - // When Driver is specified it takes precedence over other DB related - // fields. - Driver database.DB - - Type string - Badger struct { - Path string - } - Postgres struct { - ConnectionString string - Table string - } - SQLite struct { - Path string - Table string - } +type ConfigDB struct { + // When Driver is specified it takes precedence over other DB related + // fields. + Driver database.DB + + Type string `long:"db.type" usage:"database type; accepts badger,postgres,inmemory,sqlite"` + Badger struct { + Path string `long:"db.badger.path" usage:"path to badger DB"` } - - API struct { - Enabled bool - - HTTP struct { - Address string - } - GRPC struct { - Address string - } + Postgres struct { + ConnectionString string `long:"db.postgres.connection-string" usage:"postgres connection string, may be a database URL or in PostgreSQL keyword/value format"` + Table string `long:"db.postgres.table" usage:"postgres table in which to store data (will be created if it does not exist)"` } + SQLite struct { + Path string `long:"db.sqlite.path" usage:"path to sqlite3 DB"` + Table string `long:"db.sqlite.table" usage:"sqlite3 table in which to store data (will be created if it does not exist)"` + } +} - Log struct { - NewLogger func(level, format string) log.CtxLogger - Level string - Format string +type ConfigAPI struct { + Enabled bool `long:"api.enabled" usage:"enable HTTP and gRPC API"` + HTTP struct { + Address string `long:"http.address" usage:"address for serving the HTTP API"` + } + GRPC struct { + Address string `long:"grpc.address" usage:"address for serving the gRPC API"` } +} + +type ConfigLog struct { + NewLogger func(level, format string) log.CtxLogger + Level string `long:"log.level" usage:"sets logging level; accepts debug, info, warn, error, trace"` + Format string `long:"log.format" usage:"sets the format of the logging; accepts json, cli"` +} + +// Config holds all configurable values for Conduit. +type Config struct { + DB ConfigDB + API ConfigAPI + Log ConfigLog Connectors struct { - Path string + Path string `long:"connectors.path" usage:"path to standalone connectors' directory"` } Processors struct { - Path string + Path string `long:"processors.path" usage:"path to standalone processors' directory"` } Pipelines struct { @@ -90,37 +93,37 @@ type Config struct { ExitOnDegraded bool ErrorRecovery struct { // MinDelay is the minimum delay before restart: Default: 1 second - MinDelay time.Duration + MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" usage:"minimum delay before restart"` // MaxDelay is the maximum delay before restart: Default: 10 minutes - MaxDelay time.Duration + MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" usage:"maximum delay before restart"` // BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2 - BackoffFactor int + BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" usage:"backoff factor applied to the last delay"` // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite) - MaxRetries int64 + MaxRetries int64 `long:"pipelines.error-recovery.max-retries" usage:"maximum number of retries"` // MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes - MaxRetriesWindow time.Duration + MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"` } } ConnectorPlugins map[string]sdk.Connector SchemaRegistry struct { - Type string + Type string `long:"schema-registry.type" usage:"schema registry type; accepts builtin,confluent"` Confluent struct { - ConnectionString string + ConnectionString string `long:"schema-registry.confluent.connection-string" usage:"confluent schema registry connection string"` } } Preview struct { // PipelineArchV2 enables the new pipeline architecture. - PipelineArchV2 bool + PipelineArchV2 bool `long:"preview.pipeline-arch-v2" usage:"enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)"` } - dev struct { - cpuprofile string - memprofile string - blockprofile string + Dev struct { + CPUProfile string `long:"dev.cpuprofile" usage:"write CPU profile to file"` + MemProfile string `long:"dev.memprofile" usage:"write memory profile to file"` + BlockProfile string `long:"dev.blockprofile" usage:"write block profile to file"` } } diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index 259dd3fdf..e9cce35c1 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -16,35 +16,18 @@ package conduit import ( "context" - "flag" "fmt" "os" "os/signal" "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/peterbourgon/ff/v3" - "github.com/peterbourgon/ff/v3/ffyaml" ) const ( exitCodeErr = 1 exitCodeInterrupt = 2 - - // Deprecated: Use `pipelines.error-recovery.exit-on-degraded` instead. - FlagPipelinesExitOnError = "pipelines.exit-on-error" ) -// HiddenFlags is a map of flags that should not be shown in the help output. -var HiddenFlags = map[string]bool{ - FlagPipelinesExitOnError: true, -} - -// Serve is a shortcut for Entrypoint.Serve. -func Serve(cfg Config) { - e := &Entrypoint{} - e.Serve(cfg) -} - // Entrypoint provides methods related to the Conduit entrypoint (parsing // config, managing interrupt signals etc.). type Entrypoint struct{} @@ -59,9 +42,6 @@ type Entrypoint struct{} // - environment variables // - config file (lowest priority) func (e *Entrypoint) Serve(cfg Config) { - flags := Flags(&cfg) - e.ParseConfig(flags) - if cfg.Log.Format == "cli" { _, _ = fmt.Fprintf(os.Stdout, "%s\n", e.Splash()) } @@ -79,140 +59,6 @@ func (e *Entrypoint) Serve(cfg Config) { } } -// Flags returns a flag set that, when parsed, stores the values in the provided -// config struct. -func Flags(cfg *Config) *flag.FlagSet { - // TODO extract flags from config struct rather than defining flags manually - flags := flag.NewFlagSet("conduit", flag.ExitOnError) - - flags.StringVar(&cfg.DB.Type, "db.type", cfg.DB.Type, "database type; accepts badger,postgres,inmemory,sqlite") - flags.StringVar(&cfg.DB.Badger.Path, "db.badger.path", cfg.DB.Badger.Path, "path to badger DB") - flags.StringVar( - &cfg.DB.Postgres.ConnectionString, - "db.postgres.connection-string", - cfg.DB.Postgres.ConnectionString, - "postgres connection string, may be a database URL or in PostgreSQL keyword/value format", - ) - flags.StringVar(&cfg.DB.Postgres.Table, "db.postgres.table", cfg.DB.Postgres.Table, "postgres table in which to store data (will be created if it does not exist)") - flags.StringVar(&cfg.DB.SQLite.Path, "db.sqlite.path", cfg.DB.SQLite.Path, "path to sqlite3 DB") - flags.StringVar(&cfg.DB.SQLite.Table, "db.sqlite.table", cfg.DB.SQLite.Table, "sqlite3 table in which to store data (will be created if it does not exist)") - flags.BoolVar(&cfg.API.Enabled, "api.enabled", cfg.API.Enabled, "enable HTTP and gRPC API") - flags.StringVar(&cfg.API.HTTP.Address, "http.address", cfg.API.HTTP.Address, "address for serving the HTTP API") - flags.StringVar(&cfg.API.GRPC.Address, "grpc.address", cfg.API.GRPC.Address, "address for serving the gRPC API") - - flags.StringVar(&cfg.Log.Level, "log.level", cfg.Log.Level, "sets logging level; accepts debug, info, warn, error, trace") - flags.StringVar(&cfg.Log.Format, "log.format", cfg.Log.Format, "sets the format of the logging; accepts json, cli") - - flags.StringVar(&cfg.Connectors.Path, "connectors.path", cfg.Connectors.Path, "path to standalone connectors' directory") - flags.StringVar(&cfg.Processors.Path, "processors.path", cfg.Processors.Path, "path to standalone processors' directory") - - // Pipeline configuration - flags.StringVar( - &cfg.Pipelines.Path, - "pipelines.path", - cfg.Pipelines.Path, - "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file", - ) - - // Deprecated: use `pipelines.exit-on-degraded` instead - // Note: If both `pipeline.exit-on-error` and `pipeline.exit-on-degraded` are set, `pipeline.exit-on-degraded` will take precedence - flags.BoolVar( - &cfg.Pipelines.ExitOnDegraded, - FlagPipelinesExitOnError, - cfg.Pipelines.ExitOnDegraded, - "Deprecated: use `exit-on-degraded` instead.\nexit Conduit if a pipeline experiences an error while running", - ) - - flags.BoolVar( - &cfg.Pipelines.ExitOnDegraded, - "pipelines.exit-on-degraded", - cfg.Pipelines.ExitOnDegraded, - "exit Conduit if a pipeline enters a degraded state", - ) - - flags.DurationVar( - &cfg.Pipelines.ErrorRecovery.MinDelay, - "pipelines.error-recovery.min-delay", - cfg.Pipelines.ErrorRecovery.MinDelay, - "minimum delay before restart", - ) - flags.DurationVar( - &cfg.Pipelines.ErrorRecovery.MaxDelay, - "pipelines.error-recovery.max-delay", - cfg.Pipelines.ErrorRecovery.MaxDelay, - "maximum delay before restart", - ) - flags.IntVar( - &cfg.Pipelines.ErrorRecovery.BackoffFactor, - "pipelines.error-recovery.backoff-factor", - cfg.Pipelines.ErrorRecovery.BackoffFactor, - "backoff factor applied to the last delay", - ) - flags.Int64Var( - &cfg.Pipelines.ErrorRecovery.MaxRetries, - "pipelines.error-recovery.max-retries", - cfg.Pipelines.ErrorRecovery.MaxRetries, - "maximum number of retries", - ) - flags.DurationVar( - &cfg.Pipelines.ErrorRecovery.MaxRetriesWindow, - "pipelines.error-recovery.max-retries-window", - cfg.Pipelines.ErrorRecovery.MaxRetriesWindow, - "amount of time running without any errors after which a pipeline is considered healthy", - ) - - flags.StringVar(&cfg.SchemaRegistry.Type, "schema-registry.type", cfg.SchemaRegistry.Type, "schema registry type; accepts builtin,confluent") - flags.StringVar(&cfg.SchemaRegistry.Confluent.ConnectionString, "schema-registry.confluent.connection-string", cfg.SchemaRegistry.Confluent.ConnectionString, "confluent schema registry connection string") - - flags.BoolVar(&cfg.Preview.PipelineArchV2, "preview.pipeline-arch-v2", cfg.Preview.PipelineArchV2, "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)") - - flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file") - flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file") - flags.StringVar(&cfg.dev.blockprofile, "dev.blockprofile", "", "write block profile to file") - - // show user or dev flags - flags.Usage = func() { - tmpFlags := flag.NewFlagSet("conduit", flag.ExitOnError) - - // preserve original flag's output to the same writer - tmpFlags.SetOutput(flags.Output()) - - flags.VisitAll(func(f *flag.Flag) { - if HiddenFlags[f.Name] { - return - } - // reset value to its default, to ensure default is shown correctly - _ = f.Value.Set(f.DefValue) - tmpFlags.Var(f.Value, f.Name, f.Usage) - }) - tmpFlags.Usage() - } - - return flags -} - -func (e *Entrypoint) ParseConfig(flags *flag.FlagSet) { - _ = flags.String("config", "conduit.yaml", "global config file") - version := flags.Bool("version", false, "prints current Conduit version") - - // flags is set up to exit on error, we can safely ignore the error - err := ff.Parse(flags, os.Args[1:], - ff.WithEnvVarPrefix("CONDUIT"), - ff.WithConfigFileFlag("config"), - ff.WithConfigFileParser(ffyaml.Parser), - ff.WithAllowMissingConfigFile(true), - ) - if err != nil { - e.exitWithError(err) - } - - // check if the -version flag is set - if *version { - _, _ = fmt.Fprintf(os.Stdout, "%s\n", Version(true)) - os.Exit(0) - } -} - // CancelOnInterrupt returns a context that is canceled when the interrupt // signal is received. // * After the first signal the function will continue to listen diff --git a/pkg/conduit/entrypoint_test.go b/pkg/conduit/entrypoint_test.go deleted file mode 100644 index 21c56bdad..000000000 --- a/pkg/conduit/entrypoint_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright © 2024 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package conduit - -import ( - "bytes" - "strings" - "testing" -) - -func TestFlags_HelpOutput(t *testing.T) { - var buf bytes.Buffer - - flags := Flags(&Config{}) - flags.SetOutput(&buf) - - flags.Usage() - output := buf.String() - - expectedFlags := []string{ - "db.type", - "db.badger.path", - "db.postgres.connection-string", - "db.postgres.table", - "db.sqlite.path", - "db.sqlite.table", - "dev.blockprofileblockprofile", - "dev.cpuprofile", - "dev.memprofile", - "api.enabled", - "http.address", - "grpc.address", - "log.level", - "log.format", - "connectors.path", - "processors.path", - "pipelines.path", - "pipelines.exit-on-degraded", - "pipelines.error-recovery.min-delay", - "pipelines.error-recovery.max-delay", - "pipelines.error-recovery.backoff-factor", - "pipelines.error-recovery.max-retries", - "pipelines.error-recovery.max-retries-window", - "schema-registry.type", - "schema-registry.confluent.connection-string", - "preview.pipeline-arch-v2", - } - - unexpectedFlags := []string{ - FlagPipelinesExitOnError, - } - - for _, flag := range expectedFlags { - if !strings.Contains(output, flag) { - t.Errorf("expected flag %q not found in help output", flag) - } - } - - for _, flag := range unexpectedFlags { - if strings.Contains(output, flag) { - t.Errorf("unexpected flag %q found in help output", flag) - } - } -} diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 74df8dcb2..c9c94d2c6 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -388,8 +388,8 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error } }() - if r.Config.dev.cpuprofile != "" { - f, err := os.Create(r.Config.dev.cpuprofile) + if r.Config.Dev.CPUProfile != "" { + f, err := os.Create(r.Config.Dev.CPUProfile) if err != nil { return deferred, cerrors.Errorf("could not create CPU profile: %w", err) } @@ -399,9 +399,9 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error } deferFunc(pprof.StopCPUProfile) } - if r.Config.dev.memprofile != "" { + if r.Config.Dev.MemProfile != "" { deferFunc(func() { - f, err := os.Create(r.Config.dev.memprofile) + f, err := os.Create(r.Config.Dev.MemProfile) if err != nil { r.logger.Err(ctx, err).Msg("could not create memory profile") return @@ -413,10 +413,10 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error } }) } - if r.Config.dev.blockprofile != "" { + if r.Config.Dev.BlockProfile != "" { runtime.SetBlockProfileRate(1) deferFunc(func() { - f, err := os.Create(r.Config.dev.blockprofile) + f, err := os.Create(r.Config.Dev.BlockProfile) if err != nil { r.logger.Err(ctx, err).Msg("could not create block profile") return