Skip to content

Commit

Permalink
Update protocol (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Jul 31, 2024
1 parent a0f521f commit 05f6d05
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22.5
require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/conduitio/conduit-commons v0.2.1-0.20240717151024-0c8d1f406cb2
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240724140503-f2dd0a3e4d36
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980
github.com/goccy/go-json v0.10.3
github.com/golangci/golangci-lint v1.59.1
github.com/google/go-cmp v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ github.com/ckaznocha/intrange v0.1.2 h1:3Y4JAxcMntgb/wABQ6e8Q8leMd26JbX2790lIss9
github.com/ckaznocha/intrange v0.1.2/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8VhJZEEA5n+RE=
github.com/conduitio/conduit-commons v0.2.1-0.20240717151024-0c8d1f406cb2 h1:0Ba/B4lyxeGIVk4zvVGRx1kAdLuXK+8st/LyMKZCmu4=
github.com/conduitio/conduit-commons v0.2.1-0.20240717151024-0c8d1f406cb2/go.mod h1:w0eHaH81yoab8VcrrTjFGNGRQMx45RnXWobKMpKjgrM=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240724140503-f2dd0a3e4d36 h1:/tUV6UUdWRiJQV6gLMYmvwOkjJjwd74H3YAibU3vhK4=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240724140503-f2dd0a3e4d36/go.mod h1:hjnONUqZv2Lb+W9hUP2fSP8gjvlM2WJiUw47n4dDqBA=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980 h1:Hwg9Ho0Rvrg0rVype0yQA7jJtGrG4zY6RQzvcXAi4Kk=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980/go.mod h1:GyI6kkdR55JGM/96v5OSI7vlVodur3L22SY+OJbPd0s=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/curioswitch/go-reassign v0.2.0 h1:G9UZyOcpk/d7Gd6mqYgd8XYWFMw/znxwGDUstnC9DIo=
Expand Down
4 changes: 2 additions & 2 deletions internal/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package internal
import (
"context"

"github.com/conduitio/conduit-connector-protocol/pconduit"
"github.com/conduitio/conduit-connector-protocol/pconnector"
"github.com/conduitio/conduit-connector-protocol/pconnutils"
)

// -- Connector ID ----------------------------------------------------------
Expand Down Expand Up @@ -68,7 +68,7 @@ func Enrich(ctx context.Context, cfg pconnector.PluginConfig) context.Context {
// The connector ID is expected to be used more often than the log level.
ctx = ContextWithLogLevel(ctx, cfg.LogLevel)
ctx = ContextWithConnectorID(ctx, cfg.ConnectorID)
ctx = pconduit.ContextWithConnectorToken(ctx, cfg.Token)
ctx = pconnutils.ContextWithConnectorToken(ctx, cfg.Token)

return ctx
}
4 changes: 2 additions & 2 deletions internal/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"context"
"testing"

"github.com/conduitio/conduit-connector-protocol/pconduit"
"github.com/conduitio/conduit-connector-protocol/pconnector"
"github.com/conduitio/conduit-connector-protocol/pconnutils"
"github.com/matryer/is"
)

Expand All @@ -33,7 +33,7 @@ func TestContextUtils_Enrich(t *testing.T) {
}

got := Enrich(ctx, cfg)
is.Equal(cfg.Token, pconduit.ConnectorTokenFromContext(got))
is.Equal(cfg.Token, pconnutils.ConnectorTokenFromContext(got))
is.Equal(cfg.ConnectorID, ConnectorIDFromContext(got))
is.Equal(cfg.LogLevel, LogLevelFromContext(got))
}
18 changes: 9 additions & 9 deletions schema/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sync"

"github.com/conduitio/conduit-commons/schema"
"github.com/conduitio/conduit-connector-protocol/pconduit"
"github.com/conduitio/conduit-connector-protocol/pconnutils"
)

type inMemoryService struct {
Expand All @@ -31,15 +31,15 @@ type inMemoryService struct {
m sync.Mutex
}

func newInMemoryService() pconduit.SchemaService {
func newInMemoryService() pconnutils.SchemaService {
return &inMemoryService{
schemas: make(map[string][]schema.Schema),
}
}

func (s *inMemoryService) CreateSchema(_ context.Context, request pconduit.CreateSchemaRequest) (pconduit.CreateSchemaResponse, error) {
func (s *inMemoryService) CreateSchema(_ context.Context, request pconnutils.CreateSchemaRequest) (pconnutils.CreateSchemaResponse, error) {
if request.Type != schema.TypeAvro {
return pconduit.CreateSchemaResponse{}, ErrInvalidSchemaType
return pconnutils.CreateSchemaResponse{}, fmt.Errorf("unsupported schema type: %s", request.Type)
}

s.m.Lock()
Expand All @@ -53,21 +53,21 @@ func (s *inMemoryService) CreateSchema(_ context.Context, request pconduit.Creat
}
s.schemas[request.Subject] = append(s.schemas[request.Subject], inst)

return pconduit.CreateSchemaResponse{Schema: inst}, nil
return pconnutils.CreateSchemaResponse{Schema: inst}, nil
}

func (s *inMemoryService) GetSchema(_ context.Context, request pconduit.GetSchemaRequest) (pconduit.GetSchemaResponse, error) {
func (s *inMemoryService) GetSchema(_ context.Context, request pconnutils.GetSchemaRequest) (pconnutils.GetSchemaResponse, error) {
s.m.Lock()
defer s.m.Unlock()

versions, ok := s.schemas[request.Subject]
if !ok {
return pconduit.GetSchemaResponse{}, fmt.Errorf("subject %v: %w", request.Subject, ErrSchemaNotFound)
return pconnutils.GetSchemaResponse{}, fmt.Errorf("subject %v: %w", request.Subject, ErrSubjectNotFound)
}

if len(versions) < request.Version {
return pconduit.GetSchemaResponse{}, fmt.Errorf("version %v: %w", request.Version, ErrSchemaNotFound)
return pconnutils.GetSchemaResponse{}, fmt.Errorf("version %v: %w", request.Version, ErrVersionNotFound)
}

return pconduit.GetSchemaResponse{Schema: versions[request.Version-1]}, nil
return pconnutils.GetSchemaResponse{Schema: versions[request.Version-1]}, nil
}
15 changes: 7 additions & 8 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"fmt"

"github.com/conduitio/conduit-commons/schema"
"github.com/conduitio/conduit-connector-protocol/pconduit"
"github.com/conduitio/conduit-connector-protocol/pconduit/v1/client"
"github.com/conduitio/conduit-connector-protocol/pconnutils"
"github.com/conduitio/conduit-connector-protocol/pconnutils/v1/client"
"github.com/conduitio/conduit-connector-sdk/internal"
"google.golang.org/grpc"
)
Expand All @@ -30,10 +30,9 @@ func init() {
}

var (
ErrSchemaNotFound = pconduit.ErrSchemaNotFound
ErrInvalidSchemaSubject = pconduit.ErrInvalidSchemaSubject
ErrInvalidSchemaType = pconduit.ErrInvalidSchemaType
ErrInvalidSchemaBytes = pconduit.ErrInvalidSchemaBytes
ErrSubjectNotFound = pconnutils.ErrSubjectNotFound
ErrVersionNotFound = pconnutils.ErrVersionNotFound
ErrInvalidSchema = pconnutils.ErrInvalidSchema
)

// Service is the schema service client that can be used to interact with the schema service.
Expand All @@ -42,7 +41,7 @@ var Service = newInMemoryService()

// Create creates a new schema with the given name and bytes. The schema type must be Avro.
func Create(ctx context.Context, typ schema.Type, subject string, bytes []byte) (schema.Schema, error) {
resp, err := Service.CreateSchema(ctx, pconduit.CreateSchemaRequest{
resp, err := Service.CreateSchema(ctx, pconnutils.CreateSchemaRequest{
Subject: qualifiedSubject(ctx, subject),
Type: typ,
Bytes: bytes,
Expand All @@ -68,7 +67,7 @@ func qualifiedSubject(ctx context.Context, subject string) string {

// Get retrieves the schema with the given name and version. If the schema does not exist, an error is returned.
func Get(ctx context.Context, subject string, version int) (schema.Schema, error) {
resp, err := Service.GetSchema(ctx, pconduit.GetSchemaRequest{
resp, err := Service.GetSchema(ctx, pconnutils.GetSchemaRequest{
Subject: subject,
Version: version,
})
Expand Down
16 changes: 8 additions & 8 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"fmt"
"os"

"github.com/conduitio/conduit-connector-protocol/pconduit"
"github.com/conduitio/conduit-connector-protocol/pconnector"
"github.com/conduitio/conduit-connector-protocol/pconnector/server"
"github.com/conduitio/conduit-connector-protocol/pconnutils"
"github.com/conduitio/conduit-connector-sdk/internal"
)

Expand Down Expand Up @@ -81,17 +81,17 @@ func serve(c Connector) error {
}

func getPluginConfig() (pconnector.PluginConfig, error) {
token := os.Getenv(pconduit.EnvConduitConnectorToken)
token := os.Getenv(pconnutils.EnvConduitConnectorToken)
if token == "" {
return pconnector.PluginConfig{}, missingEnvError(pconduit.EnvConduitConnectorToken, "v0.11.0")
return pconnector.PluginConfig{}, missingEnvError(pconnutils.EnvConduitConnectorToken, "v0.11.0")
}

connectorID := os.Getenv(pconduit.EnvConduitConnectorID)
connectorID := os.Getenv(pconnutils.EnvConduitConnectorID)
if connectorID == "" {
return pconnector.PluginConfig{}, missingEnvError(pconduit.EnvConduitConnectorID, "v0.11.0")
return pconnector.PluginConfig{}, missingEnvError(pconnutils.EnvConduitConnectorID, "v0.11.0")
}

logLevel := os.Getenv(pconduit.EnvConduitLogLevel)
logLevel := os.Getenv(pconnutils.EnvConduitLogLevel)

return pconnector.PluginConfig{
Token: token,
Expand All @@ -104,9 +104,9 @@ func getPluginConfig() (pconnector.PluginConfig, error) {
// The values are fetched from environment variables provided by conduit-connector-protocol.
// The function returns an error if the environment variables are not specified or empty.
func connectorUtilitiesGRPCTarget() (string, error) {
target := os.Getenv(pconduit.EnvConduitConnectorUtilitiesGRPCTarget)
target := os.Getenv(pconnutils.EnvConduitConnectorUtilitiesGRPCTarget)
if target == "" {
return "", missingEnvError(pconduit.EnvConduitConnectorUtilitiesGRPCTarget, "v0.11.0")
return "", missingEnvError(pconnutils.EnvConduitConnectorUtilitiesGRPCTarget, "v0.11.0")
}

return target, nil
Expand Down

0 comments on commit 05f6d05

Please sign in to comment.