diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go
deleted file mode 100644
index 90c39509b..000000000
--- a/cmd/cli/cli.go
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright © 2024 Meroxa, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package cli
-
-import (
- "fmt"
- "os"
-
- "github.com/conduitio/conduit/pkg/conduit"
- "github.com/spf13/cobra"
- "github.com/spf13/pflag"
-)
-
-var (
- initArgs InitArgs
- pipelinesInitArgs PipelinesInitArgs
-)
-
-type Instance struct {
- rootCmd *cobra.Command
-}
-
-// New creates a new CLI Instance.
-func New() *Instance {
- return &Instance{
- rootCmd: buildRootCmd(),
- }
-}
-
-func (i *Instance) Run() {
- if err := i.rootCmd.Execute(); err != nil {
- _, _ = fmt.Fprintf(os.Stderr, "%v\n", err)
- os.Exit(1)
- }
-}
-
-func buildRootCmd() *cobra.Command {
- cfg := conduit.DefaultConfig()
-
- cmd := &cobra.Command{
- Use: "conduit",
- Short: "Conduit CLI",
- Long: "Conduit CLI is a command-line that helps you interact with and manage Conduit.",
- Version: conduit.Version(true),
- Run: func(cmd *cobra.Command, args []string) {
- e := &conduit.Entrypoint{}
- e.Serve(cfg)
- },
- }
- cmd.CompletionOptions.DisableDefaultCmd = true
- conduit.Flags(&cfg).VisitAll(cmd.Flags().AddGoFlag)
-
- // init
- cmd.AddCommand(buildInitCmd())
-
- // pipelines
- cmd.AddGroup(&cobra.Group{
- ID: "pipelines",
- Title: "Pipelines",
- })
- cmd.AddCommand(buildPipelinesCmd())
-
- // mark hidden flags
- cmd.Flags().VisitAll(func(f *pflag.Flag) {
- if conduit.HiddenFlags[f.Name] {
- err := cmd.Flags().MarkHidden(f.Name)
- if err != nil {
- fmt.Fprintf(os.Stderr, "failed to mark flag %q as hidden: %v", f.Name, err)
- }
- }
- })
-
- return cmd
-}
-
-func buildInitCmd() *cobra.Command {
- initCmd := &cobra.Command{
- Use: "init",
- Short: "Initialize Conduit with a configuration file and directories.",
- Args: cobra.NoArgs,
- RunE: func(cmd *cobra.Command, args []string) error {
- return NewConduitInit(initArgs).Run()
- },
- }
- initCmd.Flags().StringVar(
- &initArgs.Path,
- "config.path",
- "",
- "path where Conduit will be initialized",
- )
-
- return initCmd
-}
-
-func buildPipelinesCmd() *cobra.Command {
- pipelinesCmd := &cobra.Command{
- Use: "pipelines",
- Short: "Initialize and manage pipelines",
- Args: cobra.NoArgs,
- GroupID: "pipelines",
- }
-
- pipelinesCmd.AddCommand(buildPipelinesInitCmd())
-
- return pipelinesCmd
-}
-
-func buildPipelinesInitCmd() *cobra.Command {
- pipelinesInitCmd := &cobra.Command{
- Use: "init [pipeline-name]",
- Short: "Initialize an example pipeline.",
- Long: `Initialize a pipeline configuration file, with all of parameters for source and destination connectors
-initialized and described. The source and destination connector can be chosen via flags. If no connectors are chosen, then
-a simple and runnable generator-to-log pipeline is configured.`,
- Args: cobra.MaximumNArgs(1),
- Example: " conduit pipelines init awesome-pipeline-name --source postgres --destination kafka --path pipelines/pg-to-kafka.yaml",
- RunE: func(cmd *cobra.Command, args []string) error {
- if len(args) > 0 {
- pipelinesInitArgs.Name = args[0]
- }
- return NewPipelinesInit(pipelinesInitArgs).Run()
- },
- }
-
- // Add flags to pipelines init command
- pipelinesInitCmd.Flags().StringVar(
- &pipelinesInitArgs.Source,
- "source",
- "",
- "Source connector (any of the built-in connectors).",
- )
- pipelinesInitCmd.Flags().StringVar(
- &pipelinesInitArgs.Destination,
- "destination",
- "",
- "Destination connector (any of the built-in connectors).",
- )
- pipelinesInitCmd.Flags().StringVar(
- &pipelinesInitArgs.Path,
- "pipelines.path",
- "./pipelines",
- "Path where the pipeline will be saved.",
- )
-
- return pipelinesInitCmd
-}
diff --git a/cmd/cli/cli_test.go b/cmd/cli/cli_test.go
deleted file mode 100644
index 7bca0b0d1..000000000
--- a/cmd/cli/cli_test.go
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright © 2024 Meroxa, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package cli
-
-import (
- "bytes"
- "strings"
- "testing"
-
- "github.com/conduitio/conduit/pkg/conduit"
-)
-
-func TestBuildRootCmd_HelpOutput(t *testing.T) {
- cmd := buildRootCmd()
-
- var buf bytes.Buffer
- cmd.SetOut(&buf)
- cmd.SetArgs([]string{"--help"})
-
- err := cmd.Execute()
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
-
- output := buf.String()
-
- expectedFlags := []string{
- "db.type",
- "db.badger.path",
- "db.postgres.connection-string",
- "db.postgres.table",
- "db.sqlite.path",
- "db.sqlite.table",
- "dev.blockprofileblockprofile",
- "dev.cpuprofile",
- "dev.memprofile",
- "api.enabled",
- "http.address",
- "grpc.address",
- "log.level",
- "log.format",
- "connectors.path",
- "processors.path",
- "pipelines.path",
- "pipelines.exit-on-degraded",
- "pipelines.error-recovery.min-delay",
- "pipelines.error-recovery.max-delay",
- "pipelines.error-recovery.backoff-factor",
- "pipelines.error-recovery.max-retries",
- "pipelines.error-recovery.max-retries-window",
- "schema-registry.type",
- "schema-registry.confluent.connection-string",
- "preview.pipeline-arch-v2",
- }
-
- unexpectedFlags := []string{
- conduit.FlagPipelinesExitOnError, //nolint:staticcheck // this will be completely removed before Conduit 1.0
- }
-
- for _, flag := range expectedFlags {
- if !strings.Contains(output, flag) {
- t.Errorf("expected flag %q not found in help output", flag)
- }
- }
-
- for _, flag := range unexpectedFlags {
- if strings.Contains(output, flag) {
- t.Errorf("unexpected flag %q found in help output", flag)
- }
- }
-}
diff --git a/cmd/cli/conduit_init.go b/cmd/cli/conduit_init.go
deleted file mode 100644
index 40ece5351..000000000
--- a/cmd/cli/conduit_init.go
+++ /dev/null
@@ -1,122 +0,0 @@
-// Copyright © 2024 Meroxa, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package cli
-
-import (
- "flag"
- "fmt"
- "os"
- "path/filepath"
-
- "github.com/conduitio/conduit/cmd/cli/internal"
- "github.com/conduitio/conduit/pkg/conduit"
- "github.com/conduitio/conduit/pkg/foundation/cerrors"
- "github.com/conduitio/yaml/v3"
-)
-
-type InitArgs struct {
- Path string
-}
-
-type ConduitInit struct {
- args InitArgs
-}
-
-func NewConduitInit(args InitArgs) *ConduitInit {
- return &ConduitInit{args: args}
-}
-
-func (i *ConduitInit) Run() error {
- err := i.createDirs()
- if err != nil {
- return err
- }
-
- err = i.createConfigYAML()
- if err != nil {
- return fmt.Errorf("failed to create config YAML: %w", err)
- }
-
- fmt.Println(`
-Conduit has been initialized!
-
-To quickly create an example pipeline, run 'conduit pipelines init'.
-To see how you can customize your first pipeline, run 'conduit pipelines init --help'.`)
-
- return nil
-}
-
-func (i *ConduitInit) createConfigYAML() error {
- cfgYAML := internal.NewYAMLTree()
- i.conduitCfgFlags().VisitAll(func(f *flag.Flag) {
- if conduit.HiddenFlags[f.Name] {
- return // hide flag from output
- }
- cfgYAML.Insert(f.Name, f.DefValue, f.Usage)
- })
-
- yamlData, err := yaml.Marshal(cfgYAML.Root)
- if err != nil {
- return cerrors.Errorf("error marshaling YAML: %w\n", err)
- }
-
- path := filepath.Join(i.path(), "conduit.yaml")
- err = os.WriteFile(path, yamlData, 0o600)
- if err != nil {
- return cerrors.Errorf("error writing conduit.yaml: %w", err)
- }
- fmt.Printf("Configuration file written to %v\n", path)
-
- return nil
-}
-
-func (i *ConduitInit) createDirs() error {
- dirs := []string{"processors", "connectors", "pipelines"}
-
- for _, dir := range dirs {
- path := filepath.Join(i.path(), dir)
-
- // Attempt to create the directory, skipping if it already exists
- if err := os.Mkdir(path, os.ModePerm); err != nil {
- if os.IsExist(err) {
- fmt.Printf("Directory '%s' already exists, skipping...\n", path)
- continue
- }
- return fmt.Errorf("failed to create directory '%s': %w", path, err)
- }
-
- fmt.Printf("Created directory: %s\n", path)
- }
-
- return nil
-}
-
-func (i *ConduitInit) conduitCfgFlags() *flag.FlagSet {
- cfg := conduit.DefaultConfigWithBasePath(i.path())
- return conduit.Flags(&cfg)
-}
-
-func (i *ConduitInit) path() string {
- if i.args.Path != "" {
- return i.args.Path
- }
-
- path, err := os.Getwd()
- if err != nil {
- panic(cerrors.Errorf("failed to get current working directory: %w", err))
- }
-
- return path
-}
diff --git a/cmd/cli/pipelines_init.go b/cmd/cli/pipelines_init.go
deleted file mode 100644
index 8b27c420b..000000000
--- a/cmd/cli/pipelines_init.go
+++ /dev/null
@@ -1,354 +0,0 @@
-// Copyright © 2024 Meroxa, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package cli
-
-import (
- _ "embed"
- "fmt"
- "log"
- "os"
- "path/filepath"
- "strings"
- "text/template"
-
- "github.com/conduitio/conduit-commons/config"
- "github.com/conduitio/conduit/pkg/foundation/cerrors"
- "github.com/conduitio/conduit/pkg/plugin"
- "github.com/conduitio/conduit/pkg/plugin/connector/builtin"
-)
-
-//go:embed pipeline.tmpl
-var pipelineCfgTmpl string
-
-var funcMap = template.FuncMap{
- "formatParameterValueTable": formatParameterValueTable,
- "formatParameterValueYAML": formatParameterValueYAML,
- "formatParameterDescriptionYAML": formatParameterDescriptionYAML,
- "formatParameterRequired": formatParameterRequired,
-}
-
-func formatParameterRequired(param config.Parameter) string {
- for _, v := range param.Validations {
- if v.Type() == config.ValidationTypeRequired {
- return "Required"
- }
- }
-
- return "Optional"
-}
-
-// formatParameterValue formats the value of a configuration parameter.
-func formatParameterValueTable(value string) string {
- switch {
- case value == "":
- return ``
- case strings.Contains(value, "\n"):
- // specifically used in the javascript processor
- return fmt.Sprintf("\n```js\n%s\n```\n", value)
- default:
- return fmt.Sprintf("`%s`", value)
- }
-}
-
-func formatParameterDescriptionYAML(description string) string {
- const (
- indentLen = 10
- prefix = "# "
- lineLen = 80
- tmpNewLine = "〠"
- )
-
- // remove markdown new lines
- description = strings.ReplaceAll(description, "\n\n", tmpNewLine)
- description = strings.ReplaceAll(description, "\n", " ")
- description = strings.ReplaceAll(description, tmpNewLine, "\n")
-
- formattedDescription := formatMultiline(description, strings.Repeat(" ", indentLen)+prefix, lineLen)
- // remove first indent and last new line
- formattedDescription = formattedDescription[indentLen : len(formattedDescription)-1]
- return formattedDescription
-}
-
-func formatMultiline(
- input string,
- prefix string,
- maxLineLen int,
-) string {
- textLen := maxLineLen - len(prefix)
-
- // split the input into lines of length textLen
- lines := strings.Split(input, "\n")
- var formattedLines []string
- for _, line := range lines {
- if len(line) <= textLen {
- formattedLines = append(formattedLines, line)
- continue
- }
-
- // split the line into multiple lines, don't break words
- words := strings.Fields(line)
- var formattedLine string
- for _, word := range words {
- if len(formattedLine)+len(word) > textLen {
- formattedLines = append(formattedLines, formattedLine[1:])
- formattedLine = ""
- }
- formattedLine += " " + word
- }
- if formattedLine != "" {
- formattedLines = append(formattedLines, formattedLine[1:])
- }
- }
-
- // combine lines including indent and prefix
- var formatted string
- for _, line := range formattedLines {
- formatted += prefix + line + "\n"
- }
-
- return formatted
-}
-
-func formatParameterValueYAML(value string) string {
- switch {
- case value == "":
- return `""`
- case strings.Contains(value, "\n"):
- // specifically used in the javascript processor
- formattedValue := formatMultiline(value, " ", 10000)
- return fmt.Sprintf("|\n%s", formattedValue)
- default:
- return fmt.Sprintf(`'%s'`, value)
- }
-}
-
-const (
- defaultDestination = "file"
- defaultSource = "generator"
-)
-
-type pipelineTemplate struct {
- Name string
- SourceSpec connectorTemplate
- DestinationSpec connectorTemplate
-}
-
-type connectorTemplate struct {
- Name string
- Params config.Parameters
-}
-
-type PipelinesInitArgs struct {
- Name string
- Source string
- Destination string
- Path string
-}
-
-type PipelinesInit struct {
- args PipelinesInitArgs
-}
-
-func NewPipelinesInit(args PipelinesInitArgs) *PipelinesInit {
- return &PipelinesInit{args: args}
-}
-
-func (pi *PipelinesInit) Run() error {
- var pipeline pipelineTemplate
- // if no source/destination arguments are provided,
- // we build a runnable example pipeline
- if pi.args.Source == "" && pi.args.Destination == "" {
- pipeline = pi.buildDemoPipeline()
- } else {
- p, err := pi.buildTemplatePipeline()
- if err != nil {
- return err
- }
- pipeline = p
- }
-
- err := pi.write(pipeline)
- if err != nil {
- return cerrors.Errorf("could not write pipeline: %w", err)
- }
-
- fmt.Printf(`Your pipeline has been initialized and created at %s.
-
-To run the pipeline, simply run 'conduit'.`, pi.configFilePath())
-
- return nil
-}
-
-func (pi *PipelinesInit) buildTemplatePipeline() (pipelineTemplate, error) {
- srcParams, err := pi.getSourceParams()
- if err != nil {
- return pipelineTemplate{}, cerrors.Errorf("failed getting source params: %w", err)
- }
-
- dstParams, err := pi.getDestinationParams()
- if err != nil {
- return pipelineTemplate{}, cerrors.Errorf("failed getting destination params: %w", err)
- }
-
- return pipelineTemplate{
- Name: pi.pipelineName(),
- SourceSpec: srcParams,
- DestinationSpec: dstParams,
- }, nil
-}
-
-func (pi *PipelinesInit) buildDemoPipeline() pipelineTemplate {
- srcParams, _ := pi.getSourceParams()
- dstParams, _ := pi.getDestinationParams()
-
- return pipelineTemplate{
- Name: pi.pipelineName(),
- SourceSpec: connectorTemplate{
- Name: defaultSource,
- Params: map[string]config.Parameter{
- "format.type": {
- Description: srcParams.Params["format.type"].Description,
- Type: srcParams.Params["format.type"].Type,
- Default: "structured",
- Validations: srcParams.Params["format.type"].Validations,
- },
- "format.options.scheduledDeparture": {
- Description: "Generate field 'scheduledDeparture' of type 'time'",
- Type: config.ParameterTypeString,
- Default: "time",
- },
- "format.options.airline": {
- Description: "Generate field 'airline' of type string",
- Type: config.ParameterTypeString,
- Default: "string",
- },
- "rate": {
- Description: srcParams.Params["rate"].Description,
- Type: srcParams.Params["rate"].Type,
- Default: "1",
- },
- },
- },
- DestinationSpec: connectorTemplate{
- Name: defaultDestination,
- Params: map[string]config.Parameter{
- "path": {
- Description: dstParams.Params["path"].Description,
- Type: dstParams.Params["path"].Type,
- Default: "./destination.txt",
- },
- },
- },
- }
-}
-
-func (pi *PipelinesInit) getOutput() *os.File {
- output, err := os.OpenFile(pi.configFilePath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600)
- if err != nil {
- log.Fatalf("error: failed to open %s: %v", pi.args.Path, err)
- }
-
- return output
-}
-
-func (pi *PipelinesInit) write(pipeline pipelineTemplate) error {
- t, err := template.New("").Funcs(funcMap).Option("missingkey=zero").Parse(pipelineCfgTmpl)
- if err != nil {
- return cerrors.Errorf("failed parsing template: %w", err)
- }
-
- output := pi.getOutput()
- defer output.Close()
-
- err = t.Execute(output, pipeline)
- if err != nil {
- return cerrors.Errorf("failed executing template: %w", err)
- }
-
- return nil
-}
-
-func (pi *PipelinesInit) getSourceParams() (connectorTemplate, error) {
- for _, conn := range builtin.DefaultBuiltinConnectors {
- specs := conn.NewSpecification()
- if specs.Name == pi.sourceConnector() || specs.Name == "builtin:"+pi.sourceConnector() {
- if conn.NewSource == nil {
- return connectorTemplate{}, cerrors.Errorf("plugin %v has no source", pi.sourceConnector())
- }
-
- return connectorTemplate{
- Name: specs.Name,
- Params: conn.NewSource().Parameters(),
- }, nil
- }
- }
-
- return connectorTemplate{}, cerrors.Errorf("%v: %w", pi.sourceConnector(), plugin.ErrPluginNotFound)
-}
-
-func (pi *PipelinesInit) getDestinationParams() (connectorTemplate, error) {
- for _, conn := range builtin.DefaultBuiltinConnectors {
- specs := conn.NewSpecification()
- if specs.Name == pi.destinationConnector() || specs.Name == "builtin:"+pi.destinationConnector() {
- if conn.NewDestination == nil {
- return connectorTemplate{}, cerrors.Errorf("plugin %v has no source", pi.destinationConnector())
- }
-
- return connectorTemplate{
- Name: specs.Name,
- Params: conn.NewDestination().Parameters(),
- }, nil
- }
- }
-
- return connectorTemplate{}, cerrors.Errorf("%v: %w", pi.destinationConnector(), plugin.ErrPluginNotFound)
-}
-
-func (pi *PipelinesInit) configFilePath() string {
- path := pi.args.Path
- if path == "" {
- path = "./pipelines"
- }
-
- return filepath.Join(path, pi.configFileName())
-}
-
-func (pi *PipelinesInit) configFileName() string {
- return fmt.Sprintf("pipeline-%s.yaml", pi.pipelineName())
-}
-
-func (pi *PipelinesInit) sourceConnector() string {
- if pi.args.Source != "" {
- return pi.args.Source
- }
-
- return defaultSource
-}
-
-func (pi *PipelinesInit) destinationConnector() string {
- if pi.args.Destination != "" {
- return pi.args.Destination
- }
-
- return defaultDestination
-}
-
-func (pi *PipelinesInit) pipelineName() string {
- if pi.args.Name != "" {
- return pi.args.Name
- }
-
- return fmt.Sprintf("%s-to-%s", pi.sourceConnector(), pi.destinationConnector())
-}
diff --git a/cmd/cli/internal/yaml.go b/cmd/conduit/internal/yaml.go
similarity index 100%
rename from cmd/cli/internal/yaml.go
rename to cmd/conduit/internal/yaml.go
diff --git a/cmd/conduit/main.go b/cmd/conduit/main.go
index a744d7f3e..0a28295a6 100644
--- a/cmd/conduit/main.go
+++ b/cmd/conduit/main.go
@@ -15,9 +15,22 @@
package main
import (
- "github.com/conduitio/conduit/cmd/cli"
+ "fmt"
+ "os"
+
+ "github.com/conduitio/conduit/cmd/conduit/root"
+ "github.com/conduitio/ecdysis"
)
func main() {
- cli.New().Run()
+ e := ecdysis.New()
+
+ cmd := e.MustBuildCobraCommand(&root.RootCommand{})
+ cmd.CompletionOptions.DisableDefaultCmd = true
+
+ if err := cmd.Execute(); err != nil {
+ _, _ = fmt.Fprintf(os.Stderr, "%v\n", err)
+ os.Exit(1)
+ }
+ os.Exit(0)
}
diff --git a/cmd/conduit/root/init.go b/cmd/conduit/root/init.go
new file mode 100644
index 000000000..365d61f12
--- /dev/null
+++ b/cmd/conduit/root/init.go
@@ -0,0 +1,147 @@
+// Copyright © 2024 Meroxa, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package root
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "reflect"
+
+ "github.com/conduitio/conduit/cmd/conduit/internal"
+ "github.com/conduitio/conduit/pkg/conduit"
+ "github.com/conduitio/conduit/pkg/foundation/cerrors"
+ "github.com/conduitio/ecdysis"
+ "github.com/conduitio/yaml/v3"
+)
+
+var (
+ _ ecdysis.CommandWithExecute = (*InitCommand)(nil)
+ _ ecdysis.CommandWithDocs = (*InitCommand)(nil)
+)
+
+type InitCommand struct {
+ cfg *conduit.Config
+ rootFlags *RootFlags
+}
+
+func (c *InitCommand) Usage() string { return "init" }
+
+func (c *InitCommand) Docs() ecdysis.Docs {
+ return ecdysis.Docs{
+ Short: `Initialize Conduit with a configuration file and directories.`,
+ }
+}
+
+func (c *InitCommand) createDirs() error {
+ // These could be used based on the root flags if those were global
+ dirs := []string{"processors", "connectors", "pipelines"}
+ conduitPath := filepath.Dir(c.rootFlags.ConduitConfigPath)
+
+ for _, dir := range dirs {
+ path := filepath.Join(conduitPath, dir)
+
+ // Attempt to create the directory, skipping if it already exists
+ if err := os.Mkdir(path, os.ModePerm); err != nil {
+ if os.IsExist(err) {
+ fmt.Printf("Directory '%s' already exists, skipping...\n", path)
+ continue
+ }
+ return fmt.Errorf("failed to create directory '%s': %w", path, err)
+ }
+
+ fmt.Printf("Created directory: %s\n", path)
+ }
+
+ return nil
+}
+
+func (c *InitCommand) createConfigYAML() error {
+ cfgYAML := internal.NewYAMLTree()
+
+ v := reflect.Indirect(reflect.ValueOf(c.cfg))
+ t := v.Type()
+
+ for i := 0; i < v.NumField(); i++ {
+ field := t.Field(i)
+ fieldValue := v.Field(i)
+
+ if fieldValue.Kind() == reflect.Struct {
+ embedStructYAML(fieldValue, field, cfgYAML)
+ } else {
+ value := fmt.Sprintf("%v", fieldValue.Interface())
+ usage := field.Tag.Get("usage")
+ longName := field.Tag.Get("long")
+
+ if longName != "" {
+ cfgYAML.Insert(longName, value, usage)
+ }
+ }
+ }
+
+ yamlData, err := yaml.Marshal(cfgYAML.Root)
+ if err != nil {
+ return cerrors.Errorf("error marshaling YAML: %w\n", err)
+ }
+
+ err = os.WriteFile(c.rootFlags.ConduitConfigPath, yamlData, 0o600)
+ if err != nil {
+ return cerrors.Errorf("error writing conduit.yaml: %w", err)
+ }
+ fmt.Printf("Configuration file written to %v\n", c.rootFlags.ConduitConfigPath)
+
+ return nil
+}
+
+func embedStructYAML(v reflect.Value, field reflect.StructField, cfgYAML *internal.YAMLTree) {
+ t := v.Type()
+ for i := 0; i < v.NumField(); i++ {
+ subField := t.Field(i)
+ subFieldValue := v.Field(i)
+
+ if subFieldValue.Kind() == reflect.Struct {
+ embedStructYAML(subFieldValue, subField, cfgYAML)
+ } else {
+ value := fmt.Sprintf("%v", subFieldValue.Interface())
+ usage := subField.Tag.Get("usage")
+ longName := subField.Tag.Get("long")
+
+ if longName != "" {
+ cfgYAML.Insert(longName, value, usage)
+ }
+ }
+ }
+}
+
+func (c *InitCommand) Execute(ctx context.Context) error {
+ err := c.createDirs()
+ if err != nil {
+ return err
+ }
+
+ err = c.createConfigYAML()
+ if err != nil {
+ return fmt.Errorf("failed to create config YAML: %w", err)
+ }
+
+ fmt.Println(`
+ Conduit has been initialized!
+
+ To quickly create an example pipeline, run 'conduit pipelines init'.
+ To see how you can customize your first pipeline, run 'conduit pipelines init --help'.`)
+
+ return nil
+}
diff --git a/cmd/conduit/root/pipelines/format_parameter.go b/cmd/conduit/root/pipelines/format_parameter.go
new file mode 100644
index 000000000..38c5e7d33
--- /dev/null
+++ b/cmd/conduit/root/pipelines/format_parameter.go
@@ -0,0 +1,125 @@
+// Copyright © 2024 Meroxa, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelines
+
+import (
+ "fmt"
+ "strings"
+ "text/template"
+
+ "github.com/conduitio/conduit-commons/config"
+)
+
+var funcMap = template.FuncMap{
+ "formatParameterValueTable": formatParameterValueTable,
+ "formatParameterValueYAML": formatParameterValueYAML,
+ "formatParameterDescriptionYAML": formatParameterDescriptionYAML,
+ "formatParameterRequired": formatParameterRequired,
+}
+
+func formatParameterRequired(param config.Parameter) string {
+ for _, v := range param.Validations {
+ if v.Type() == config.ValidationTypeRequired {
+ return "Required"
+ }
+ }
+
+ return "Optional"
+}
+
+// formatParameterValue formats the value of a configuration parameter.
+func formatParameterValueTable(value string) string {
+ switch {
+ case value == "":
+ return ``
+ case strings.Contains(value, "\n"):
+ // specifically used in the javascript processor
+ return fmt.Sprintf("\n```js\n%s\n```\n", value)
+ default:
+ return fmt.Sprintf("`%s`", value)
+ }
+}
+
+func formatParameterDescriptionYAML(description string) string {
+ const (
+ indentLen = 10
+ prefix = "# "
+ lineLen = 80
+ tmpNewLine = "〠"
+ )
+
+ // remove markdown new lines
+ description = strings.ReplaceAll(description, "\n\n", tmpNewLine)
+ description = strings.ReplaceAll(description, "\n", " ")
+ description = strings.ReplaceAll(description, tmpNewLine, "\n")
+
+ formattedDescription := formatMultiline(description, strings.Repeat(" ", indentLen)+prefix, lineLen)
+ // remove first indent and last new line
+ formattedDescription = formattedDescription[indentLen : len(formattedDescription)-1]
+ return formattedDescription
+}
+
+func formatMultiline(
+ input string,
+ prefix string,
+ maxLineLen int,
+) string {
+ textLen := maxLineLen - len(prefix)
+
+ // split the input into lines of length textLen
+ lines := strings.Split(input, "\n")
+ var formattedLines []string
+ for _, line := range lines {
+ if len(line) <= textLen {
+ formattedLines = append(formattedLines, line)
+ continue
+ }
+
+ // split the line into multiple lines, don't break words
+ words := strings.Fields(line)
+ var formattedLine string
+ for _, word := range words {
+ if len(formattedLine)+len(word) > textLen {
+ formattedLines = append(formattedLines, formattedLine[1:])
+ formattedLine = ""
+ }
+ formattedLine += " " + word
+ }
+ if formattedLine != "" {
+ formattedLines = append(formattedLines, formattedLine[1:])
+ }
+ }
+
+ // combine lines including indent and prefix
+ var formatted string
+ for _, line := range formattedLines {
+ formatted += prefix + line + "\n"
+ }
+
+ return formatted
+}
+
+func formatParameterValueYAML(value string) string {
+ switch {
+ case value == "":
+ return `""`
+ case strings.Contains(value, "\n"):
+ // specifically used in the javascript processor
+ formattedValue := formatMultiline(value, " ", 10000)
+ return fmt.Sprintf("|\n%s", formattedValue)
+ default:
+ return fmt.Sprintf(`'%s'`, value)
+ }
+}
diff --git a/cmd/conduit/root/pipelines/init.go b/cmd/conduit/root/pipelines/init.go
new file mode 100644
index 000000000..9c59be9a8
--- /dev/null
+++ b/cmd/conduit/root/pipelines/init.go
@@ -0,0 +1,253 @@
+// Copyright © 2024 Meroxa, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelines
+
+import (
+ "context"
+ _ "embed"
+ "fmt"
+ "log"
+ "os"
+ "path/filepath"
+ "text/template"
+
+ "github.com/conduitio/conduit-commons/config"
+ "github.com/conduitio/conduit/pkg/foundation/cerrors"
+ "github.com/conduitio/conduit/pkg/plugin"
+ "github.com/conduitio/conduit/pkg/plugin/connector/builtin"
+ "github.com/conduitio/ecdysis"
+)
+
+var (
+ _ ecdysis.CommandWithDocs = (*InitCommand)(nil)
+ _ ecdysis.CommandWithFlags = (*InitCommand)(nil)
+ _ ecdysis.CommandWithArgs = (*InitCommand)(nil)
+ _ ecdysis.CommandWithExecute = (*InitCommand)(nil)
+
+ //go:embed pipeline.tmpl
+ pipelineCfgTmpl string
+)
+
+const (
+ defaultSource = "generator"
+ defaultDestination = "file"
+)
+
+type InitArgs struct {
+ name string
+}
+
+type InitFlags struct {
+ Source string `long:"source" usage:"Source connector (any of the built-in connectors)." default:"generator"`
+ Destination string `long:"destination" usage:"Destination connector (any of the built-in connectors)." default:"file"`
+ PipelinesPath string `long:"pipelines.path" usage:"Path where the pipeline will be saved." default:"./pipelines"`
+}
+
+type InitCommand struct {
+ args InitArgs
+ flags InitFlags
+ configFilePath string
+}
+
+func (c *InitCommand) Flags() []ecdysis.Flag {
+ flags := ecdysis.BuildFlags(&c.flags)
+
+ currentPath, err := os.Getwd()
+ if err != nil {
+ panic(cerrors.Errorf("failed to get current working directory: %w", err))
+ }
+
+ flags.SetDefault("pipelines.path", filepath.Join(currentPath, "./pipelines"))
+ flags.SetDefault("source", defaultSource)
+ flags.SetDefault("destination", defaultDestination)
+
+ return flags
+}
+
+func (c *InitCommand) Args(args []string) error {
+ if len(args) == 0 {
+ return cerrors.Errorf("requires a pipeline name")
+ }
+
+ if len(args) > 1 {
+ return cerrors.Errorf("too many arguments")
+ }
+ c.args.name = args[0]
+ return nil
+}
+
+func (c *InitCommand) Usage() string { return "init" }
+
+func (c *InitCommand) Docs() ecdysis.Docs {
+ return ecdysis.Docs{
+ Short: "Initialize an example pipeline.",
+ Long: `Initialize a pipeline configuration file, with all of parameters for source and destination connectors
+initialized and described. The source and destination connector can be chosen via flags. If no connectors are chosen, then
+a simple and runnable generator-to-log pipeline is configured.`,
+ Example: "conduit pipelines init awesome-pipeline-name --source postgres --destination kafka --pipelines.path pipelines/pg-to-kafka.yaml",
+ }
+}
+
+func (c *InitCommand) getSourceSpec() (connectorSpec, error) {
+ for _, conn := range builtin.DefaultBuiltinConnectors {
+ specs := conn.NewSpecification()
+ if specs.Name == c.flags.Source || specs.Name == "builtin:"+c.flags.Source {
+ if conn.NewSource == nil {
+ return connectorSpec{}, cerrors.Errorf("plugin %v has no source", c.flags.Source)
+ }
+
+ return connectorSpec{
+ Name: specs.Name,
+ Params: conn.NewSource().Parameters(),
+ }, nil
+ }
+ }
+
+ return connectorSpec{}, cerrors.Errorf("%v: %w", c.flags.Source, plugin.ErrPluginNotFound)
+}
+
+func (c *InitCommand) getDestinationSpec() (connectorSpec, error) {
+ for _, conn := range builtin.DefaultBuiltinConnectors {
+ specs := conn.NewSpecification()
+ if specs.Name == c.flags.Destination || specs.Name == "builtin:"+c.flags.Destination {
+ if conn.NewDestination == nil {
+ return connectorSpec{}, cerrors.Errorf("plugin %v has no source", c.flags.Destination)
+ }
+
+ return connectorSpec{
+ Name: specs.Name,
+ Params: conn.NewDestination().Parameters(),
+ }, nil
+ }
+ }
+ return connectorSpec{}, cerrors.Errorf("%v: %w", c.flags.Destination, plugin.ErrPluginNotFound)
+}
+
+// getDemoSourceGeneratorSpec returns a simplified version of the source generator connector.
+func (c *InitCommand) getDemoSourceGeneratorSpec(spec connectorSpec) connectorSpec {
+ return connectorSpec{
+ Name: defaultSource,
+ Params: map[string]config.Parameter{
+ "format.type": {
+ Description: spec.Params["format.type"].Description,
+ Type: spec.Params["format.type"].Type,
+ Default: "structured",
+ Validations: spec.Params["format.type"].Validations,
+ },
+ "format.options.scheduledDeparture": {
+ Description: "Generate field 'scheduledDeparture' of type 'time'",
+ Type: config.ParameterTypeString,
+ Default: "time",
+ },
+ "format.options.airline": {
+ Description: "Generate field 'airline' of type string",
+ Type: config.ParameterTypeString,
+ Default: "string",
+ },
+ "rate": {
+ Description: spec.Params["rate"].Description,
+ Type: spec.Params["rate"].Type,
+ Default: "1",
+ },
+ },
+ }
+}
+
+// getDemoDestinationFileSpec returns a simplified version of the destination file connector.
+func (c *InitCommand) getDemoDestinationFileSpec(spec connectorSpec) connectorSpec {
+ return connectorSpec{
+ Name: defaultDestination,
+ Params: map[string]config.Parameter{
+ "path": {
+ Description: spec.Params["path"].Description,
+ Type: spec.Params["path"].Type,
+ Default: "./destination.txt",
+ },
+ },
+ }
+}
+
+func (c *InitCommand) buildTemplatePipeline() (pipelineTemplate, error) {
+ srcSpec, err := c.getSourceSpec()
+ if err != nil {
+ return pipelineTemplate{}, cerrors.Errorf("failed getting source params: %w", err)
+ }
+
+ // provide a simplified version
+ if c.flags.Source == defaultSource {
+ srcSpec = c.getDemoSourceGeneratorSpec(srcSpec)
+ }
+
+ dstSpec, err := c.getDestinationSpec()
+ if err != nil {
+ return pipelineTemplate{}, cerrors.Errorf("failed getting destination params: %w", err)
+ }
+
+ // provide a simplified version
+ if c.flags.Destination == defaultDestination {
+ dstSpec = c.getDemoDestinationFileSpec(dstSpec)
+ }
+
+ return pipelineTemplate{
+ Name: c.args.name,
+ SourceSpec: srcSpec,
+ DestinationSpec: dstSpec,
+ }, nil
+}
+
+func (c *InitCommand) getOutput() *os.File {
+ output, err := os.OpenFile(c.configFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600)
+ if err != nil {
+ log.Fatalf("error: failed to open %s: %v", c.configFilePath, err)
+ }
+
+ return output
+}
+
+func (c *InitCommand) write(pipeline pipelineTemplate) error {
+ t, err := template.New("").Funcs(funcMap).Option("missingkey=zero").Parse(pipelineCfgTmpl)
+ if err != nil {
+ return cerrors.Errorf("failed parsing template: %w", err)
+ }
+
+ output := c.getOutput()
+ defer output.Close()
+
+ err = t.Execute(output, pipeline)
+ if err != nil {
+ return cerrors.Errorf("failed executing template: %w", err)
+ }
+
+ return nil
+}
+
+func (c *InitCommand) Execute(_ context.Context) error {
+ c.configFilePath = filepath.Join(c.flags.PipelinesPath, fmt.Sprintf("pipeline-%s.yaml", c.args.name))
+
+ pipeline, err := c.buildTemplatePipeline()
+ if err != nil {
+ return err
+ }
+
+ if err := c.write(pipeline); err != nil {
+ return cerrors.Errorf("could not write pipeline: %w", err)
+ }
+
+ fmt.Printf(`Your pipeline has been initialized and created at %s.
+
+To run the pipeline, simply run 'conduit'.`, c.configFilePath)
+
+ return nil
+}
diff --git a/cmd/cli/pipeline.tmpl b/cmd/conduit/root/pipelines/pipeline.tmpl
similarity index 100%
rename from cmd/cli/pipeline.tmpl
rename to cmd/conduit/root/pipelines/pipeline.tmpl
diff --git a/cmd/conduit/root/pipelines/pipelines.go b/cmd/conduit/root/pipelines/pipelines.go
new file mode 100644
index 000000000..62dbcc91d
--- /dev/null
+++ b/cmd/conduit/root/pipelines/pipelines.go
@@ -0,0 +1,40 @@
+// Copyright © 2024 Meroxa, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelines
+
+import (
+ "github.com/conduitio/ecdysis"
+)
+
+var (
+ _ ecdysis.CommandWithDocs = (*PipelinesCommand)(nil)
+ _ ecdysis.CommandWithSubCommands = (*PipelinesCommand)(nil)
+)
+
+type PipelinesCommand struct{}
+
+func (c *PipelinesCommand) SubCommands() []ecdysis.Command {
+ return []ecdysis.Command{
+ &InitCommand{},
+ }
+}
+
+func (c *PipelinesCommand) Usage() string { return "pipelines" }
+
+func (c *PipelinesCommand) Docs() ecdysis.Docs {
+ return ecdysis.Docs{
+ Short: "Initialize and manage pipelines",
+ }
+}
diff --git a/cmd/conduit/root/pipelines/template.go b/cmd/conduit/root/pipelines/template.go
new file mode 100644
index 000000000..d7a707309
--- /dev/null
+++ b/cmd/conduit/root/pipelines/template.go
@@ -0,0 +1,28 @@
+// Copyright © 2024 Meroxa, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelines
+
+import "github.com/conduitio/conduit-commons/config"
+
+type connectorSpec struct {
+ Name string
+ Params config.Parameters
+}
+
+type pipelineTemplate struct {
+ Name string
+ SourceSpec connectorSpec
+ DestinationSpec connectorSpec
+}
diff --git a/cmd/conduit/root/root.go b/cmd/conduit/root/root.go
new file mode 100644
index 000000000..f68c9a58e
--- /dev/null
+++ b/cmd/conduit/root/root.go
@@ -0,0 +1,193 @@
+// Copyright © 2024 Meroxa, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package root
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "github.com/conduitio/conduit/cmd/conduit/root/pipelines"
+ "github.com/conduitio/conduit/pkg/conduit"
+ "github.com/conduitio/conduit/pkg/foundation/cerrors"
+ "github.com/conduitio/ecdysis"
+ "github.com/spf13/viper"
+)
+
+var (
+ _ ecdysis.CommandWithFlags = (*RootCommand)(nil)
+ _ ecdysis.CommandWithExecute = (*RootCommand)(nil)
+ _ ecdysis.CommandWithDocs = (*RootCommand)(nil)
+ _ ecdysis.CommandWithSubCommands = (*RootCommand)(nil)
+)
+
+const ConduitPrefix = "CONDUIT"
+
+type RootFlags struct {
+ // Global flags -----------------------------------------------------------
+
+ // Conduit configuration file
+ ConduitConfigPath string `long:"config.path" usage:"global conduit configuration file" persistent:"true" default:"./conduit.yaml"`
+
+ // Version
+ Version bool `long:"version" short:"v" usage:"show current Conduit version" persistent:"true"`
+
+ conduit.Config
+}
+
+type RootCommand struct {
+ flags RootFlags
+ cfg conduit.Config
+}
+
+func (c *RootCommand) updateConfig() error {
+ v := viper.New()
+
+ // Set default values
+ v.SetDefault("config.path", c.flags.ConduitConfigPath)
+ v.SetDefault("db.type", c.flags.DB.Type)
+ v.SetDefault("api.enabled", c.flags.API.Enabled)
+ v.SetDefault("log.level", c.flags.Log.Level)
+ v.SetDefault("log.format", c.flags.Log.Format)
+ v.SetDefault("connectors.path", c.flags.Connectors.Path)
+ v.SetDefault("processors.path", c.flags.Processors.Path)
+ v.SetDefault("pipelines.path", c.flags.Pipelines.Path)
+ v.SetDefault("pipelines.exit-on-degraded", c.flags.Pipelines.ExitOnDegraded)
+ v.SetDefault("pipelines.error-recovery.min-delay", c.flags.Pipelines.ErrorRecovery.MinDelay)
+ v.SetDefault("pipelines.error-recovery.max-delay", c.flags.Pipelines.ErrorRecovery.MaxDelay)
+ v.SetDefault("pipelines.error-recovery.backoff-factor", c.flags.Pipelines.ErrorRecovery.BackoffFactor)
+ v.SetDefault("pipelines.error-recovery.max-retries", c.flags.Pipelines.ErrorRecovery.MaxRetries)
+ v.SetDefault("pipelines.error-recovery.max-retries-window", c.flags.Pipelines.ErrorRecovery.MaxRetriesWindow)
+ v.SetDefault("schema-registry.type", c.flags.SchemaRegistry.Type)
+ v.SetDefault("schema-registry.confluent.connection-string", c.flags.SchemaRegistry.Confluent.ConnectionString)
+ v.SetDefault("preview.pipeline-arch-v2", c.flags.Preview.PipelineArchV2)
+ v.SetDefault("dev.cpuprofile", c.flags.Dev.CPUProfile)
+ v.SetDefault("dev.memprofile", c.flags.Dev.MemProfile)
+ v.SetDefault("dev.blockprofile", c.flags.Dev.BlockProfile)
+
+ // Read configuration from file
+ v.SetConfigFile(c.flags.ConduitConfigPath)
+
+ // ignore if file doesn't exist. Maybe we could check if user is trying read from a file that doesn't exist.
+ _ = v.ReadInConfig()
+
+ // Set environment variable prefix and automatic mapping
+ v.SetEnvPrefix(ConduitPrefix)
+ v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
+ v.AutomaticEnv()
+
+ v.BindEnv("db.type")
+ v.BindEnv("db.badger.path")
+ v.BindEnv("db.postgres.connection-string")
+ v.BindEnv("db.postgres.table")
+ v.BindEnv("db.sqlite.path")
+ v.BindEnv("db.sqlite.table")
+ v.BindEnv("api.enabled")
+ v.BindEnv("http.address")
+ v.BindEnv("grpc.address")
+ v.BindEnv("log.level")
+ v.BindEnv("log.format")
+ v.BindEnv("connectors.path")
+ v.BindEnv("processors.path")
+ v.BindEnv("pipelines.path")
+ v.BindEnv("pipelines.exit-on-degraded")
+ v.BindEnv("pipelines.error-recovery.min-delay")
+ v.BindEnv("pipelines.error-recovery.max-delay")
+ v.BindEnv("pipelines.error-recovery.backoff-factor")
+ v.BindEnv("pipelines.error-recovery.max-retries")
+ v.BindEnv("pipelines.error-recovery.max-retries-window")
+ v.BindEnv("schema-registry.type")
+ v.BindEnv("schema-registry.confluent.connection-string")
+ v.BindEnv("preview.pipeline-arch-v2")
+ v.BindEnv("dev.cpuprofile")
+ v.BindEnv("dev.memprofile")
+ v.BindEnv("dev.blockprofile")
+
+ if err := v.Unmarshal(&c.cfg); err != nil {
+ return fmt.Errorf("unable to unmarshal the configuration: %w", err)
+ }
+
+ return nil
+}
+
+func (c *RootCommand) Execute(_ context.Context) error {
+ if c.flags.Version {
+ _, _ = fmt.Fprintf(os.Stdout, "%s\n", conduit.Version(true))
+ return nil
+ }
+
+ if err := c.updateConfig(); err != nil {
+ return err
+ }
+
+ e := &conduit.Entrypoint{}
+ e.Serve(c.cfg)
+ return nil
+}
+
+func (c *RootCommand) Usage() string { return "conduit" }
+func (c *RootCommand) Flags() []ecdysis.Flag {
+ flags := ecdysis.BuildFlags(&c.flags)
+
+ currentPath, err := os.Getwd()
+ if err != nil {
+ panic(cerrors.Errorf("failed to get current working directory: %w", err))
+ }
+ c.cfg = conduit.DefaultConfigWithBasePath(currentPath)
+
+ conduitConfigPath := filepath.Join(currentPath, "conduit.yaml")
+ flags.SetDefault("config.path", conduitConfigPath)
+ flags.SetDefault("db.type", c.cfg.DB.Type)
+ flags.SetDefault("db.badger.path", c.cfg.DB.Badger.Path)
+ flags.SetDefault("db.postgres.connection-string", c.cfg.DB.Postgres.ConnectionString)
+ flags.SetDefault("db.postgres.table", c.cfg.DB.Postgres.Table)
+ flags.SetDefault("db.sqlite.path", c.cfg.DB.SQLite.Path)
+ flags.SetDefault("db.sqlite.table", c.cfg.DB.SQLite.Table)
+ flags.SetDefault("api.enabled", c.cfg.API.Enabled)
+ flags.SetDefault("http.address", c.cfg.API.HTTP.Address)
+ flags.SetDefault("grpc.address", c.cfg.API.GRPC.Address)
+ flags.SetDefault("log.level", c.cfg.Log.Level)
+ flags.SetDefault("log.format", c.cfg.Log.Format)
+ flags.SetDefault("connectors.path", c.cfg.Connectors.Path)
+ flags.SetDefault("processors.path", c.cfg.Processors.Path)
+ flags.SetDefault("pipelines.path", c.cfg.Pipelines.Path)
+ flags.SetDefault("pipelines.exit-on-degraded", c.cfg.Pipelines.ExitOnDegraded)
+ flags.SetDefault("pipelines.error-recovery.min-delay", c.cfg.Pipelines.ErrorRecovery.MinDelay)
+ flags.SetDefault("pipelines.error-recovery.max-delay", c.cfg.Pipelines.ErrorRecovery.MaxDelay)
+ flags.SetDefault("pipelines.error-recovery.backoff-factor", c.cfg.Pipelines.ErrorRecovery.BackoffFactor)
+ flags.SetDefault("pipelines.error-recovery.max-retries", c.cfg.Pipelines.ErrorRecovery.MaxRetries)
+ flags.SetDefault("pipelines.error-recovery.max-retries-window", c.cfg.Pipelines.ErrorRecovery.MaxRetriesWindow)
+ flags.SetDefault("schema-registry.type", c.cfg.SchemaRegistry.Type)
+ flags.SetDefault("schema-registry.confluent.connection-string", c.cfg.SchemaRegistry.Confluent.ConnectionString)
+ flags.SetDefault("preview.pipeline-arch-v2", c.cfg.Preview.PipelineArchV2)
+
+ return flags
+}
+
+func (c *RootCommand) Docs() ecdysis.Docs {
+ return ecdysis.Docs{
+ Short: "Conduit CLI",
+ Long: `Conduit CLI is a command-line that helps you interact with and manage Conduit.`,
+ }
+}
+
+func (c *RootCommand) SubCommands() []ecdysis.Command {
+ return []ecdysis.Command{
+ &InitCommand{cfg: &c.cfg, rootFlags: &c.flags},
+ &pipelines.PipelinesCommand{},
+ }
+}
diff --git a/cmd/conduit/root/root_test.go b/cmd/conduit/root/root_test.go
new file mode 100644
index 000000000..4ad0127e9
--- /dev/null
+++ b/cmd/conduit/root/root_test.go
@@ -0,0 +1,245 @@
+// Copyright © 2024 Meroxa, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package root
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/conduitio/conduit/pkg/conduit"
+ "github.com/conduitio/ecdysis"
+ isT "github.com/matryer/is"
+)
+
+func TestRootCommandFlags(t *testing.T) {
+ is := isT.New(t)
+
+ expectedFlags := []struct {
+ longName string
+ shortName string
+ required bool
+ persistent bool
+ hidden bool
+ }{
+ {longName: "config.path", persistent: true},
+ {longName: "version", shortName: "v", persistent: true},
+ {longName: "db.type"},
+ {longName: "db.badger.path"},
+ {longName: "db.postgres.connection-string"},
+ {longName: "db.postgres.table"},
+ {longName: "db.sqlite.path"},
+ {longName: "db.sqlite.table"},
+ {longName: "api.enabled"},
+ {longName: "http.address"},
+ {longName: "grpc.address"},
+ {longName: "log.level"},
+ {longName: "log.format"},
+ {longName: "connectors.path"},
+ {longName: "processors.path"},
+ {longName: "pipelines.path"},
+ {longName: "pipelines.exit-on-degraded"},
+ {longName: "pipelines.error-recovery.min-delay"},
+ {longName: "pipelines.error-recovery.max-delay"},
+ {longName: "pipelines.error-recovery.backoff-factor"},
+ {longName: "pipelines.error-recovery.max-retries"},
+ {longName: "pipelines.error-recovery.max-retries-window"},
+ {longName: "schema-registry.type"},
+ {longName: "schema-registry.confluent.connection-string"},
+ {longName: "preview.pipeline-arch-v2"},
+ {longName: "dev.cpuprofile"},
+ {longName: "dev.memprofile"},
+ {longName: "dev.blockprofile"},
+ }
+
+ c := &RootCommand{}
+ flags := c.Flags()
+
+ for _, ef := range expectedFlags {
+ var foundFlag *ecdysis.Flag
+ for _, f := range flags {
+ if f.Long == ef.longName {
+ foundFlag = &f
+ break
+ }
+ }
+
+ is.True(foundFlag != nil)
+
+ if foundFlag != nil {
+ is.Equal(ef.shortName, foundFlag.Short)
+ is.Equal(ef.required, foundFlag.Required)
+ is.Equal(ef.persistent, foundFlag.Persistent)
+ is.Equal(ef.hidden, foundFlag.Hidden)
+ }
+ }
+}
+
+func TestRootCommand_updateConfig(t *testing.T) {
+ is := isT.New(t)
+
+ tmpDir, err := os.MkdirTemp("", "config-test")
+ is.NoErr(err)
+ defer os.RemoveAll(tmpDir)
+
+ configFileContent := `
+db:
+ type: "sqlite"
+ sqlite:
+ path: "/custom/path/db"
+log:
+ level: "debug"
+ format: "json"
+api:
+ enabled: false
+`
+
+ configPath := filepath.Join(tmpDir, "config.yaml")
+ err = os.WriteFile(configPath, []byte(configFileContent), 0644)
+ is.NoErr(err)
+
+ defaultCfg := conduit.DefaultConfigWithBasePath(tmpDir)
+
+ tests := []struct {
+ name string
+ flags *RootFlags
+ configFile string
+ envVars map[string]string
+ assertFunc func(*isT.I, conduit.Config)
+ }{
+ {
+ name: "default values only",
+ flags: &RootFlags{
+ ConduitConfigPath: "nonexistent.yaml",
+ Config: conduit.Config{
+ DB: conduit.DefaultConfig().DB,
+ Log: conduit.DefaultConfig().Log,
+ API: conduit.DefaultConfig().API,
+ },
+ },
+ assertFunc: func(is *isT.I, cfg conduit.Config) {
+ is.Equal(cfg.DB.Type, defaultCfg.DB.Type)
+ is.Equal(cfg.Log.Level, defaultCfg.Log.Level)
+ is.Equal(cfg.API.Enabled, defaultCfg.API.Enabled)
+ },
+ },
+ {
+ name: "config file overrides defaults",
+ flags: &RootFlags{
+ ConduitConfigPath: configPath,
+ },
+ assertFunc: func(is *isT.I, cfg conduit.Config) {
+ is.Equal(cfg.DB.Type, "sqlite")
+ is.Equal(cfg.DB.SQLite.Path, "/custom/path/db")
+ is.Equal(cfg.Log.Level, "debug")
+ is.Equal(cfg.Log.Format, "json")
+ is.Equal(cfg.API.Enabled, false)
+ },
+ },
+ {
+ name: "env vars override config file",
+ flags: &RootFlags{
+ ConduitConfigPath: configPath,
+ },
+ envVars: map[string]string{
+ "CONDUIT_DB_TYPE": "postgres",
+ "CONDUIT_LOG_LEVEL": "warn",
+ "CONDUIT_API_ENABLED": "true",
+ },
+ assertFunc: func(is *isT.I, cfg conduit.Config) {
+ is.Equal(cfg.DB.Type, "postgres")
+ is.Equal(cfg.Log.Level, "warn")
+ is.Equal(cfg.API.Enabled, true)
+ // Config file values that weren't overridden should remain
+ is.Equal(cfg.Log.Format, "json")
+ },
+ },
+ {
+ name: "flags override everything",
+ flags: &RootFlags{
+ ConduitConfigPath: configPath,
+ Config: conduit.Config{
+ DB: conduit.ConfigDB{
+ Type: "sqlite",
+ },
+ Log: conduit.ConfigLog{
+ Level: "error",
+ Format: "text",
+ },
+ API: conduit.ConfigAPI{
+ Enabled: true,
+ },
+ },
+ },
+ envVars: map[string]string{
+ "CONDUIT_DB_TYPE": "postgres",
+ "CONDUIT_LOG_LEVEL": "warn",
+ "CONDUIT_API_ENABLED": "false",
+ },
+ assertFunc: func(is *isT.I, cfg conduit.Config) {
+ is.Equal(cfg.DB.Type, "sqlite")
+ is.Equal(cfg.Log.Level, "error")
+ is.Equal(cfg.Log.Format, "text")
+ is.Equal(cfg.API.Enabled, true)
+ },
+ },
+ {
+ name: "partial overrides",
+ flags: &RootFlags{
+ ConduitConfigPath: configPath,
+ Config: conduit.Config{
+ Log: conduit.ConfigLog{Level: "warn"},
+ },
+ },
+ envVars: map[string]string{
+ "CONDUIT_DB_TYPE": "postgres",
+ },
+ assertFunc: func(is *isT.I, cfg conduit.Config) {
+ is.Equal(cfg.DB.Type, "postgres")
+ is.Equal(cfg.Log.Level, "warn")
+ is.Equal(cfg.Log.Format, "json")
+ is.Equal(cfg.API.Enabled, defaultCfg.API.Enabled)
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ is := isT.New(t)
+
+ os.Clearenv()
+
+ for k, v := range tt.envVars {
+ os.Setenv(k, v)
+ }
+ defer func() {
+ for k := range tt.envVars {
+ os.Unsetenv(k)
+ }
+ }()
+
+ c := &RootCommand{
+ flags: *tt.flags,
+ }
+
+ c.cfg = conduit.DefaultConfigWithBasePath(tmpDir)
+
+ err := c.updateConfig()
+ is.NoErr(err)
+
+ tt.assertFunc(is, c.cfg)
+ })
+ }
+}
diff --git a/go.mod b/go.mod
index 071aa3b80..84e359965 100644
--- a/go.mod
+++ b/go.mod
@@ -19,6 +19,7 @@ require (
github.com/conduitio/conduit-connector-sdk v0.12.0
github.com/conduitio/conduit-processor-sdk v0.4.0
github.com/conduitio/conduit-schema-registry v0.2.2
+ github.com/conduitio/ecdysis v0.0.0-20241104140515-1031f323f080
github.com/conduitio/yaml/v3 v3.3.0
github.com/dop251/goja v0.0.0-20240806095544-3491d4a58fbe
github.com/dop251/goja_nodejs v0.0.0-20231122114759-e84d9a924c5c
@@ -34,15 +35,13 @@ require (
github.com/jpillora/backoff v1.0.0
github.com/matryer/is v1.4.1
github.com/neilotoole/slogt v1.1.0
- github.com/peterbourgon/ff/v3 v3.4.0
github.com/piotrkowalczuk/promgrpc/v4 v4.1.4
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.60.1
github.com/rs/zerolog v1.33.0
github.com/sourcegraph/conc v0.3.0
- github.com/spf13/cobra v1.8.1
- github.com/spf13/pflag v1.0.5
+ github.com/spf13/viper v1.19.0
github.com/stealthrocket/wazergo v0.19.1
github.com/tetratelabs/wazero v1.8.2
github.com/twmb/franz-go/pkg/sr v1.2.0
@@ -320,7 +319,8 @@ require (
github.com/sourcegraph/go-diff v0.7.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
- github.com/spf13/viper v1.19.0 // indirect
+ github.com/spf13/cobra v1.8.1 // indirect
+ github.com/spf13/pflag v1.0.5 // indirect
github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect
github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
@@ -388,3 +388,5 @@ require (
mvdan.cc/unparam v0.0.0-20240528143540-8a5130ca722f // indirect
pluginrpc.com/pluginrpc v0.5.0 // indirect
)
+
+replace github.com/conduitio/ecdysis => ../ecdysis
diff --git a/go.sum b/go.sum
index c2af8e074..b0f88218c 100644
--- a/go.sum
+++ b/go.sum
@@ -718,8 +718,6 @@ github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH
github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
-github.com/peterbourgon/ff/v3 v3.4.0 h1:QBvM/rizZM1cB0p0lGMdmR7HxZeI/ZrBWB4DqLkMUBc=
-github.com/peterbourgon/ff/v3 v3.4.0/go.mod h1:zjJVUhx+twciwfDl0zBcFzl4dW8axCRyXE/eKY9RztQ=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go
index aba89f144..b9523d4fd 100644
--- a/pkg/conduit/config.go
+++ b/pkg/conduit/config.go
@@ -39,50 +39,53 @@ const (
SchemaRegistryTypeBuiltin = "builtin"
)
-// Config holds all configurable values for Conduit.
-type Config struct {
- DB struct {
- // When Driver is specified it takes precedence over other DB related
- // fields.
- Driver database.DB
-
- Type string
- Badger struct {
- Path string
- }
- Postgres struct {
- ConnectionString string
- Table string
- }
- SQLite struct {
- Path string
- Table string
- }
+type ConfigDB struct {
+ // When Driver is specified it takes precedence over other DB related
+ // fields.
+ Driver database.DB
+
+ Type string `long:"db.type" usage:"database type; accepts badger,postgres,inmemory,sqlite"`
+ Badger struct {
+ Path string `long:"db.badger.path" usage:"path to badger DB"`
}
-
- API struct {
- Enabled bool
-
- HTTP struct {
- Address string
- }
- GRPC struct {
- Address string
- }
+ Postgres struct {
+ ConnectionString string `long:"db.postgres.connection-string" usage:"postgres connection string, may be a database URL or in PostgreSQL keyword/value format"`
+ Table string `long:"db.postgres.table" usage:"postgres table in which to store data (will be created if it does not exist)"`
}
+ SQLite struct {
+ Path string `long:"db.sqlite.path" usage:"path to sqlite3 DB"`
+ Table string `long:"db.sqlite.table" usage:"sqlite3 table in which to store data (will be created if it does not exist)"`
+ }
+}
- Log struct {
- NewLogger func(level, format string) log.CtxLogger
- Level string
- Format string
+type ConfigAPI struct {
+ Enabled bool `long:"api.enabled" usage:"enable HTTP and gRPC API"`
+ HTTP struct {
+ Address string `long:"http.address" usage:"address for serving the HTTP API"`
+ }
+ GRPC struct {
+ Address string `long:"grpc.address" usage:"address for serving the gRPC API"`
}
+}
+
+type ConfigLog struct {
+ NewLogger func(level, format string) log.CtxLogger
+ Level string `long:"log.level" usage:"sets logging level; accepts debug, info, warn, error, trace"`
+ Format string `long:"log.format" usage:"sets the format of the logging; accepts json, cli"`
+}
+
+// Config holds all configurable values for Conduit.
+type Config struct {
+ DB ConfigDB
+ API ConfigAPI
+ Log ConfigLog
Connectors struct {
- Path string
+ Path string `long:"connectors.path" usage:"path to standalone connectors' directory"`
}
Processors struct {
- Path string
+ Path string `long:"processors.path" usage:"path to standalone processors' directory"`
}
Pipelines struct {
@@ -90,37 +93,37 @@ type Config struct {
ExitOnDegraded bool
ErrorRecovery struct {
// MinDelay is the minimum delay before restart: Default: 1 second
- MinDelay time.Duration
+ MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" usage:"minimum delay before restart"`
// MaxDelay is the maximum delay before restart: Default: 10 minutes
- MaxDelay time.Duration
+ MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" usage:"maximum delay before restart"`
// BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2
- BackoffFactor int
+ BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" usage:"backoff factor applied to the last delay"`
// MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite)
- MaxRetries int64
+ MaxRetries int64 `long:"pipelines.error-recovery.max-retries" usage:"maximum number of retries"`
// MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes
- MaxRetriesWindow time.Duration
+ MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"`
}
}
ConnectorPlugins map[string]sdk.Connector
SchemaRegistry struct {
- Type string
+ Type string `long:"schema-registry.type" usage:"schema registry type; accepts builtin,confluent"`
Confluent struct {
- ConnectionString string
+ ConnectionString string `long:"schema-registry.confluent.connection-string" usage:"confluent schema registry connection string"`
}
}
Preview struct {
// PipelineArchV2 enables the new pipeline architecture.
- PipelineArchV2 bool
+ PipelineArchV2 bool `long:"preview.pipeline-arch-v2" usage:"enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)"`
}
- dev struct {
- cpuprofile string
- memprofile string
- blockprofile string
+ Dev struct {
+ CPUProfile string `long:"dev.cpuprofile" usage:"write CPU profile to file"`
+ MemProfile string `long:"dev.memprofile" usage:"write memory profile to file"`
+ BlockProfile string `long:"dev.blockprofile" usage:"write block profile to file"`
}
}
diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go
index 259dd3fdf..e9cce35c1 100644
--- a/pkg/conduit/entrypoint.go
+++ b/pkg/conduit/entrypoint.go
@@ -16,35 +16,18 @@ package conduit
import (
"context"
- "flag"
"fmt"
"os"
"os/signal"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
- "github.com/peterbourgon/ff/v3"
- "github.com/peterbourgon/ff/v3/ffyaml"
)
const (
exitCodeErr = 1
exitCodeInterrupt = 2
-
- // Deprecated: Use `pipelines.error-recovery.exit-on-degraded` instead.
- FlagPipelinesExitOnError = "pipelines.exit-on-error"
)
-// HiddenFlags is a map of flags that should not be shown in the help output.
-var HiddenFlags = map[string]bool{
- FlagPipelinesExitOnError: true,
-}
-
-// Serve is a shortcut for Entrypoint.Serve.
-func Serve(cfg Config) {
- e := &Entrypoint{}
- e.Serve(cfg)
-}
-
// Entrypoint provides methods related to the Conduit entrypoint (parsing
// config, managing interrupt signals etc.).
type Entrypoint struct{}
@@ -59,9 +42,6 @@ type Entrypoint struct{}
// - environment variables
// - config file (lowest priority)
func (e *Entrypoint) Serve(cfg Config) {
- flags := Flags(&cfg)
- e.ParseConfig(flags)
-
if cfg.Log.Format == "cli" {
_, _ = fmt.Fprintf(os.Stdout, "%s\n", e.Splash())
}
@@ -79,140 +59,6 @@ func (e *Entrypoint) Serve(cfg Config) {
}
}
-// Flags returns a flag set that, when parsed, stores the values in the provided
-// config struct.
-func Flags(cfg *Config) *flag.FlagSet {
- // TODO extract flags from config struct rather than defining flags manually
- flags := flag.NewFlagSet("conduit", flag.ExitOnError)
-
- flags.StringVar(&cfg.DB.Type, "db.type", cfg.DB.Type, "database type; accepts badger,postgres,inmemory,sqlite")
- flags.StringVar(&cfg.DB.Badger.Path, "db.badger.path", cfg.DB.Badger.Path, "path to badger DB")
- flags.StringVar(
- &cfg.DB.Postgres.ConnectionString,
- "db.postgres.connection-string",
- cfg.DB.Postgres.ConnectionString,
- "postgres connection string, may be a database URL or in PostgreSQL keyword/value format",
- )
- flags.StringVar(&cfg.DB.Postgres.Table, "db.postgres.table", cfg.DB.Postgres.Table, "postgres table in which to store data (will be created if it does not exist)")
- flags.StringVar(&cfg.DB.SQLite.Path, "db.sqlite.path", cfg.DB.SQLite.Path, "path to sqlite3 DB")
- flags.StringVar(&cfg.DB.SQLite.Table, "db.sqlite.table", cfg.DB.SQLite.Table, "sqlite3 table in which to store data (will be created if it does not exist)")
- flags.BoolVar(&cfg.API.Enabled, "api.enabled", cfg.API.Enabled, "enable HTTP and gRPC API")
- flags.StringVar(&cfg.API.HTTP.Address, "http.address", cfg.API.HTTP.Address, "address for serving the HTTP API")
- flags.StringVar(&cfg.API.GRPC.Address, "grpc.address", cfg.API.GRPC.Address, "address for serving the gRPC API")
-
- flags.StringVar(&cfg.Log.Level, "log.level", cfg.Log.Level, "sets logging level; accepts debug, info, warn, error, trace")
- flags.StringVar(&cfg.Log.Format, "log.format", cfg.Log.Format, "sets the format of the logging; accepts json, cli")
-
- flags.StringVar(&cfg.Connectors.Path, "connectors.path", cfg.Connectors.Path, "path to standalone connectors' directory")
- flags.StringVar(&cfg.Processors.Path, "processors.path", cfg.Processors.Path, "path to standalone processors' directory")
-
- // Pipeline configuration
- flags.StringVar(
- &cfg.Pipelines.Path,
- "pipelines.path",
- cfg.Pipelines.Path,
- "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file",
- )
-
- // Deprecated: use `pipelines.exit-on-degraded` instead
- // Note: If both `pipeline.exit-on-error` and `pipeline.exit-on-degraded` are set, `pipeline.exit-on-degraded` will take precedence
- flags.BoolVar(
- &cfg.Pipelines.ExitOnDegraded,
- FlagPipelinesExitOnError,
- cfg.Pipelines.ExitOnDegraded,
- "Deprecated: use `exit-on-degraded` instead.\nexit Conduit if a pipeline experiences an error while running",
- )
-
- flags.BoolVar(
- &cfg.Pipelines.ExitOnDegraded,
- "pipelines.exit-on-degraded",
- cfg.Pipelines.ExitOnDegraded,
- "exit Conduit if a pipeline enters a degraded state",
- )
-
- flags.DurationVar(
- &cfg.Pipelines.ErrorRecovery.MinDelay,
- "pipelines.error-recovery.min-delay",
- cfg.Pipelines.ErrorRecovery.MinDelay,
- "minimum delay before restart",
- )
- flags.DurationVar(
- &cfg.Pipelines.ErrorRecovery.MaxDelay,
- "pipelines.error-recovery.max-delay",
- cfg.Pipelines.ErrorRecovery.MaxDelay,
- "maximum delay before restart",
- )
- flags.IntVar(
- &cfg.Pipelines.ErrorRecovery.BackoffFactor,
- "pipelines.error-recovery.backoff-factor",
- cfg.Pipelines.ErrorRecovery.BackoffFactor,
- "backoff factor applied to the last delay",
- )
- flags.Int64Var(
- &cfg.Pipelines.ErrorRecovery.MaxRetries,
- "pipelines.error-recovery.max-retries",
- cfg.Pipelines.ErrorRecovery.MaxRetries,
- "maximum number of retries",
- )
- flags.DurationVar(
- &cfg.Pipelines.ErrorRecovery.MaxRetriesWindow,
- "pipelines.error-recovery.max-retries-window",
- cfg.Pipelines.ErrorRecovery.MaxRetriesWindow,
- "amount of time running without any errors after which a pipeline is considered healthy",
- )
-
- flags.StringVar(&cfg.SchemaRegistry.Type, "schema-registry.type", cfg.SchemaRegistry.Type, "schema registry type; accepts builtin,confluent")
- flags.StringVar(&cfg.SchemaRegistry.Confluent.ConnectionString, "schema-registry.confluent.connection-string", cfg.SchemaRegistry.Confluent.ConnectionString, "confluent schema registry connection string")
-
- flags.BoolVar(&cfg.Preview.PipelineArchV2, "preview.pipeline-arch-v2", cfg.Preview.PipelineArchV2, "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)")
-
- flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file")
- flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file")
- flags.StringVar(&cfg.dev.blockprofile, "dev.blockprofile", "", "write block profile to file")
-
- // show user or dev flags
- flags.Usage = func() {
- tmpFlags := flag.NewFlagSet("conduit", flag.ExitOnError)
-
- // preserve original flag's output to the same writer
- tmpFlags.SetOutput(flags.Output())
-
- flags.VisitAll(func(f *flag.Flag) {
- if HiddenFlags[f.Name] {
- return
- }
- // reset value to its default, to ensure default is shown correctly
- _ = f.Value.Set(f.DefValue)
- tmpFlags.Var(f.Value, f.Name, f.Usage)
- })
- tmpFlags.Usage()
- }
-
- return flags
-}
-
-func (e *Entrypoint) ParseConfig(flags *flag.FlagSet) {
- _ = flags.String("config", "conduit.yaml", "global config file")
- version := flags.Bool("version", false, "prints current Conduit version")
-
- // flags is set up to exit on error, we can safely ignore the error
- err := ff.Parse(flags, os.Args[1:],
- ff.WithEnvVarPrefix("CONDUIT"),
- ff.WithConfigFileFlag("config"),
- ff.WithConfigFileParser(ffyaml.Parser),
- ff.WithAllowMissingConfigFile(true),
- )
- if err != nil {
- e.exitWithError(err)
- }
-
- // check if the -version flag is set
- if *version {
- _, _ = fmt.Fprintf(os.Stdout, "%s\n", Version(true))
- os.Exit(0)
- }
-}
-
// CancelOnInterrupt returns a context that is canceled when the interrupt
// signal is received.
// * After the first signal the function will continue to listen
diff --git a/pkg/conduit/entrypoint_test.go b/pkg/conduit/entrypoint_test.go
deleted file mode 100644
index 21c56bdad..000000000
--- a/pkg/conduit/entrypoint_test.go
+++ /dev/null
@@ -1,76 +0,0 @@
-// Copyright © 2024 Meroxa, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package conduit
-
-import (
- "bytes"
- "strings"
- "testing"
-)
-
-func TestFlags_HelpOutput(t *testing.T) {
- var buf bytes.Buffer
-
- flags := Flags(&Config{})
- flags.SetOutput(&buf)
-
- flags.Usage()
- output := buf.String()
-
- expectedFlags := []string{
- "db.type",
- "db.badger.path",
- "db.postgres.connection-string",
- "db.postgres.table",
- "db.sqlite.path",
- "db.sqlite.table",
- "dev.blockprofileblockprofile",
- "dev.cpuprofile",
- "dev.memprofile",
- "api.enabled",
- "http.address",
- "grpc.address",
- "log.level",
- "log.format",
- "connectors.path",
- "processors.path",
- "pipelines.path",
- "pipelines.exit-on-degraded",
- "pipelines.error-recovery.min-delay",
- "pipelines.error-recovery.max-delay",
- "pipelines.error-recovery.backoff-factor",
- "pipelines.error-recovery.max-retries",
- "pipelines.error-recovery.max-retries-window",
- "schema-registry.type",
- "schema-registry.confluent.connection-string",
- "preview.pipeline-arch-v2",
- }
-
- unexpectedFlags := []string{
- FlagPipelinesExitOnError,
- }
-
- for _, flag := range expectedFlags {
- if !strings.Contains(output, flag) {
- t.Errorf("expected flag %q not found in help output", flag)
- }
- }
-
- for _, flag := range unexpectedFlags {
- if strings.Contains(output, flag) {
- t.Errorf("unexpected flag %q found in help output", flag)
- }
- }
-}
diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go
index 74df8dcb2..c9c94d2c6 100644
--- a/pkg/conduit/runtime.go
+++ b/pkg/conduit/runtime.go
@@ -388,8 +388,8 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error
}
}()
- if r.Config.dev.cpuprofile != "" {
- f, err := os.Create(r.Config.dev.cpuprofile)
+ if r.Config.Dev.CPUProfile != "" {
+ f, err := os.Create(r.Config.Dev.CPUProfile)
if err != nil {
return deferred, cerrors.Errorf("could not create CPU profile: %w", err)
}
@@ -399,9 +399,9 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error
}
deferFunc(pprof.StopCPUProfile)
}
- if r.Config.dev.memprofile != "" {
+ if r.Config.Dev.MemProfile != "" {
deferFunc(func() {
- f, err := os.Create(r.Config.dev.memprofile)
+ f, err := os.Create(r.Config.Dev.MemProfile)
if err != nil {
r.logger.Err(ctx, err).Msg("could not create memory profile")
return
@@ -413,10 +413,10 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error
}
})
}
- if r.Config.dev.blockprofile != "" {
+ if r.Config.Dev.BlockProfile != "" {
runtime.SetBlockProfileRate(1)
deferFunc(func() {
- f, err := os.Create(r.Config.dev.blockprofile)
+ f, err := os.Create(r.Config.Dev.BlockProfile)
if err != nil {
r.logger.Err(ctx, err).Msg("could not create block profile")
return