Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SDK, use paramgen #62

Merged
merged 4 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func TestAcceptance(t *testing.T) {
goleak.IgnoreTopFunction("syscall.Syscall6"), // linux
goleak.IgnoreTopFunction("syscall.syscall6"), // darwin
},
ReadTimeout: time.Second,
WriteTimeout: time.Second,
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
},
})
}
18 changes: 12 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ package file

import (
"fmt"
"os"
)

const (
// ConfigPath is the config name for the path to the file.
ConfigPath = "path"
)
type Config struct {
// Path is the file path used by the connector to read/write records.
Path string `json:"path" validate:"required"`
}

func requiredConfigErr(name string) error {
return fmt.Errorf("%q config value must be set", name)
func (c Config) Validate() error {
// make sure we can stat the file, we don't care if it doesn't exist though
_, err := os.Stat(c.Path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf(`config value "path" does not contain a valid path: %w`, err)
}
return nil
}
72 changes: 27 additions & 45 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,52 +12,55 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate paramgen -output destination_paramgen.go DestinationConfig

package file

import (
"bytes"
"context"
"fmt"
"os"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
)

// Destination connector
type Destination struct {
sdk.UnimplementedDestination

config map[string]string
config DestinationConfig

buf bytes.Buffer
file *os.File
}

type DestinationConfig struct {
Config // embed the global config
}

func (c DestinationConfig) Validate() error { return c.Config.Validate() }

func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() map[string]sdk.Parameter {
return map[string]sdk.Parameter{
"path": {
Default: "",
Description: "the file path where the file destination writes messages",
Required: true,
},
}
func (d *Destination) Parameters() config.Parameters {
return d.config.Parameters()
}

func (d *Destination) Configure(_ context.Context, m map[string]string) error {
err := d.validateConfig(m)
func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters())
if err != nil {
return err
}
err = d.config.Validate()
if err != nil {
return err
}
d.config = m
return nil
}

func (d *Destination) Open(_ context.Context) error {
file, err := d.openOrCreate(d.config[ConfigPath])
func (d *Destination) Open(context.Context) error {
file, err := d.openOrCreate(d.config.Path)
if err != nil {
return err
}
Expand All @@ -66,15 +69,12 @@ func (d *Destination) Open(_ context.Context) error {
return nil
}

func (d *Destination) Write(_ context.Context, recs []sdk.Record) (int, error) {
defer d.buf.Reset() // always reset buffer after write
for _, r := range recs {
d.buf.Write(r.Bytes())
d.buf.WriteRune('\n')
}
_, err := d.buf.WriteTo(d.file)
if err != nil {
return 0, err
func (d *Destination) Write(_ context.Context, recs []opencdc.Record) (int, error) {
for i, r := range recs {
_, err := d.file.Write(append(r.Bytes(), '\n'))
if err != nil {
return i, err
}
}
return len(recs), nil
}
Expand Down Expand Up @@ -107,21 +107,3 @@ func (d *Destination) openOrCreate(path string) (*os.File, error) {

return file, nil
}

func (d *Destination) validateConfig(cfg map[string]string) error {
path, ok := cfg[ConfigPath]
if !ok {
return requiredConfigErr(ConfigPath)
}

// make sure we can stat the file, we don't care if it doesn't exist though
_, err := os.Stat(path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf(
"%q config value %q does not contain a valid path: %w",
ConfigPath, path, err,
)
}

return nil
}
21 changes: 21 additions & 0 deletions destination_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading