-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdestination.go
77 lines (64 loc) · 2.73 KB
/
destination.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package connectorname
//go:generate paramgen -output=paramgen_dest.go DestinationConfig
import (
"context"
"fmt"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
)
type Destination struct {
sdk.UnimplementedDestination
config DestinationConfig
}
type DestinationConfig struct {
// Config includes parameters that are the same in the source and destination.
Config
// DestinationConfigParam must be either yes or no (defaults to yes).
DestinationConfigParam string `validate:"inclusion=yes|no" default:"yes"`
}
func NewDestination() sdk.Destination {
// Create Destination and wrap it in the default middleware.
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}
func (d *Destination) Parameters() config.Parameters {
// Parameters is a map of named Parameters that describe how to configure
// the Destination. Parameters can be generated from DestinationConfig with
// paramgen.
return d.config.Parameters()
}
func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
// Configure is the first function to be called in a connector. It provides
// the connector with the configuration that can be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The SDK will validate the configuration and populate default values
// before calling Configure. If you need to do more complex validations you
// can do them manually here.
sdk.Logger(ctx).Info().Msg("Configuring Destination...")
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters())
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}
return nil
}
func (d *Destination) Open(_ context.Context) error {
// Open is called after Configure to signal the plugin it can prepare to
// start writing records. If needed, the plugin should open connections in
// this function.
return nil
}
func (d *Destination) Write(_ context.Context, _ []opencdc.Record) (int, error) {
// Write writes len(r) records from r to the destination right away without
// caching. It should return the number of records written from r
// (0 <= n <= len(r)) and any error encountered that caused the write to
// stop early. Write must return a non-nil error if it returns n < len(r).
return 0, nil
}
func (d *Destination) Teardown(_ context.Context) error {
// Teardown signals to the plugin that all records were written and there
// will be no more calls to any other function. After Teardown returns, the
// plugin should be ready for a graceful shutdown.
return nil
}