Skip to content

Commit

Permalink
Update SDK, use paramgen (#62)
Browse files Browse the repository at this point in the history
* update SDK, use paramgen for config

* add tools.go, update CI actions to take versions from go.mod

* return destination parameter specs

* update to work with new sdk
  • Loading branch information
lovromazgon authored Jun 21, 2024
1 parent 3794afa commit 221c138
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 281 deletions.
2 changes: 2 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func TestAcceptance(t *testing.T) {
goleak.IgnoreTopFunction("syscall.Syscall6"), // linux
goleak.IgnoreTopFunction("syscall.syscall6"), // darwin
},
ReadTimeout: time.Second,
WriteTimeout: time.Second,
},
})
}
18 changes: 12 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ package file

import (
"fmt"
"os"
)

const (
// ConfigPath is the config name for the path to the file.
ConfigPath = "path"
)
type Config struct {
// Path is the file path used by the connector to read/write records.
Path string `json:"path" validate:"required"`
}

func requiredConfigErr(name string) error {
return fmt.Errorf("%q config value must be set", name)
func (c Config) Validate() error {
// make sure we can stat the file, we don't care if it doesn't exist though
_, err := os.Stat(c.Path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf(`config value "path" does not contain a valid path: %w`, err)
}
return nil
}
72 changes: 27 additions & 45 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,52 +12,55 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate paramgen -output destination_paramgen.go DestinationConfig

package file

import (
"bytes"
"context"
"fmt"
"os"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
)

// Destination connector
type Destination struct {
sdk.UnimplementedDestination

config map[string]string
config DestinationConfig

buf bytes.Buffer
file *os.File
}

type DestinationConfig struct {
Config // embed the global config
}

func (c DestinationConfig) Validate() error { return c.Config.Validate() }

func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() map[string]sdk.Parameter {
return map[string]sdk.Parameter{
"path": {
Default: "",
Description: "the file path where the file destination writes messages",
Required: true,
},
}
func (d *Destination) Parameters() config.Parameters {
return d.config.Parameters()
}

func (d *Destination) Configure(_ context.Context, m map[string]string) error {
err := d.validateConfig(m)
func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters())
if err != nil {
return err
}
err = d.config.Validate()
if err != nil {
return err
}
d.config = m
return nil
}

func (d *Destination) Open(_ context.Context) error {
file, err := d.openOrCreate(d.config[ConfigPath])
func (d *Destination) Open(context.Context) error {
file, err := d.openOrCreate(d.config.Path)
if err != nil {
return err
}
Expand All @@ -66,15 +69,12 @@ func (d *Destination) Open(_ context.Context) error {
return nil
}

func (d *Destination) Write(_ context.Context, recs []sdk.Record) (int, error) {
defer d.buf.Reset() // always reset buffer after write
for _, r := range recs {
d.buf.Write(r.Bytes())
d.buf.WriteRune('\n')
}
_, err := d.buf.WriteTo(d.file)
if err != nil {
return 0, err
func (d *Destination) Write(_ context.Context, recs []opencdc.Record) (int, error) {
for i, r := range recs {
_, err := d.file.Write(append(r.Bytes(), '\n'))
if err != nil {
return i, err
}
}
return len(recs), nil
}
Expand Down Expand Up @@ -107,21 +107,3 @@ func (d *Destination) openOrCreate(path string) (*os.File, error) {

return file, nil
}

func (d *Destination) validateConfig(cfg map[string]string) error {
path, ok := cfg[ConfigPath]
if !ok {
return requiredConfigErr(ConfigPath)
}

// make sure we can stat the file, we don't care if it doesn't exist though
_, err := os.Stat(path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf(
"%q config value %q does not contain a valid path: %w",
ConfigPath, path, err,
)
}

return nil
}
21 changes: 21 additions & 0 deletions destination_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 221c138

Please sign in to comment.