From a9812b7cd2df1d4870632bb748e4677a74e31e53 Mon Sep 17 00:00:00 2001 From: Maha Hajja <82542081+maha-hajja@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:11:00 -0700 Subject: [PATCH 1/3] add getSchema and createSchema for wasm processors (#1683) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add getSchema and setSchema for wasm processors * address reviews * fix bug * update processor sdk references * use schema registry + in memory schema for tests * linter fix * update processor SDK * fix linter + fix main merge * rename conduitv1 import to pconduitv1 * update conduitv1 to procutils from processorSDK * go mod tidy * create utility function for schema requests * resolve conflicts * resolve conflicts * go mod tidy * refactor a bit, add tests * update processor-sdk dependency * fix linter * use ErrorCodeInternal * update to latest processor sdk --------- Co-authored-by: Lovro Mažgon --- go.mod | 8 +- go.sum | 16 +-- pkg/conduit/runtime.go | 27 ++-- pkg/plugin/connector/connutils/schema.go | 19 ++- pkg/plugin/processor/procutils/schema.go | 95 +++++++++++++ .../processor/standalone/host_module.go | 127 +++++++++++++++++- pkg/plugin/processor/standalone/processor.go | 3 + .../processor/standalone/processor_test.go | 48 ++++--- pkg/plugin/processor/standalone/proto.go | 4 +- pkg/plugin/processor/standalone/registry.go | 18 ++- .../processor/standalone/registry_test.go | 9 +- .../test/wasm_processors/chaos/processor.go | 94 +++++++++---- 12 files changed, 375 insertions(+), 93 deletions(-) create mode 100644 pkg/plugin/processor/procutils/schema.go diff --git a/go.mod b/go.mod index 3cc860109..71ff568a6 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/Masterminds/sprig/v3 v3.2.3 github.com/NYTimes/gziphandler v1.1.1 github.com/bufbuild/buf v1.35.1 - github.com/conduitio/conduit-commons v0.2.1-0.20240723194042-1db31d4f6d85 + github.com/conduitio/conduit-commons v0.2.1-0.20240725195242-81748fb51964 github.com/conduitio/conduit-connector-file v0.6.1-0.20240709112929-6207f9f8efcf github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240731132439-34be835edabf @@ -16,8 +16,8 @@ require ( github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980 github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46 - github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731123633-05f6d05cf002 - github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd + github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731175726-14885658b257 + github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71 github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588 github.com/conduitio/yaml/v3 v3.3.0 github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d @@ -42,7 +42,7 @@ require ( github.com/rs/zerolog v1.33.0 github.com/stealthrocket/wazergo v0.19.1 github.com/tetratelabs/wazero v1.7.3 - github.com/twmb/franz-go/pkg/sr v1.0.1 + github.com/twmb/franz-go/pkg/sr v1.0.0 github.com/twmb/go-cache v1.2.1 go.uber.org/mock v0.4.0 golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 diff --git a/go.sum b/go.sum index 255b581da..776e51c5d 100644 --- a/go.sum +++ b/go.sum @@ -211,8 +211,8 @@ github.com/ckaznocha/intrange v0.1.2/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8Vh github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= -github.com/conduitio/conduit-commons v0.2.1-0.20240723194042-1db31d4f6d85 h1:rue6a80Vyk7n+wOzbz57kRf+vzN0NRJ5fxWB5aePesI= -github.com/conduitio/conduit-commons v0.2.1-0.20240723194042-1db31d4f6d85/go.mod h1:/RhBhPRgX0cbTA1KygZTQX0vK7d4LQeKQessNF79wYc= +github.com/conduitio/conduit-commons v0.2.1-0.20240725195242-81748fb51964 h1:IDtxmeMzxnWttV8lOGaG5c5w8Q4N6NyB6E46W5OQm4Y= +github.com/conduitio/conduit-commons v0.2.1-0.20240725195242-81748fb51964/go.mod h1:G07iD8aJRD3s1pvO+t8yv7/vMeKbMoSGFhZfemdLT8Y= github.com/conduitio/conduit-connector-file v0.6.1-0.20240709112929-6207f9f8efcf h1:fw4Y61EzvRPJ4bqthhQ4IMnRtu2ZOF/B81yFfABPOl8= github.com/conduitio/conduit-connector-file v0.6.1-0.20240709112929-6207f9f8efcf/go.mod h1:bCnmA+29l871cNhroZfiCS2O8+GhBNVECfL5DOof2ew= github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 h1:WMKvmvaE/E+03/0nz/2JpyelCd2nPtOTuBy3eyWcI58= @@ -227,10 +227,10 @@ github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67a github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980/go.mod h1:GyI6kkdR55JGM/96v5OSI7vlVodur3L22SY+OJbPd0s= github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46 h1:tur/pSyX1RLzkxiBwhsV1qa6wP60pb20hJMptH5RRJY= github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46/go.mod h1:m+pf2cMF+qCwhMj9gUBV1BPGLPYauhtYkj2zFddfvdE= -github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731123633-05f6d05cf002 h1:blkPwC39ZnLxleOoP10NCrBa6rMxY637F+kGXuaUfkY= -github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731123633-05f6d05cf002/go.mod h1:FXwte9pgSeO0Fwr8FoFZgvuU3lvUeFUmoo1KswCl8KU= -github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd h1:R+tpcZKWOnr6LRsXr85C167SK9MhaLhYUEjBSUupU9Y= -github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd/go.mod h1:E9zqj0atY1+yBHWi4eZ3TagCZSBnFxBQBUcZktL6RFE= +github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731175726-14885658b257 h1:FhROctjgcia8NEYqhaztAPfS9YFk7DV/sBXim1K2HXg= +github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731175726-14885658b257/go.mod h1:FXwte9pgSeO0Fwr8FoFZgvuU3lvUeFUmoo1KswCl8KU= +github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71 h1:TCHq3L/LS9Ngo2a4h39vSNpXXisTox1l5uXdwPxxtNM= +github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71/go.mod h1:n6VqVO07olTlvIUSHf2kZcU8cgu2jGmuO6bFrQST2v8= github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588 h1:/OBjxI1JjE3AmifouogZ2KvlhGJ9tQGk4X7UxwjHo1o= github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588/go.mod h1:G5t9W5Z5Mn0nW1TNnIQ1al4piqRXjc1R7HjdHgGFCx4= github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI= @@ -849,8 +849,8 @@ github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8X github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= -github.com/twmb/franz-go/pkg/sr v1.0.1 h1:hf3eRFDUWSfmR7JQCS/3JiqZEQwqbiDSS/DooewMHCE= -github.com/twmb/franz-go/pkg/sr v1.0.1/go.mod h1:aUFRRLI5WYKpKzmWDztzZFecx5eOkCNuuamd91jUV5c= +github.com/twmb/franz-go/pkg/sr v1.0.0 h1:4FUatTSTEuG2xievT0iDrgnpErgRg7kFLNioJYqfrqs= +github.com/twmb/franz-go/pkg/sr v1.0.0/go.mod h1:aUFRRLI5WYKpKzmWDztzZFecx5eOkCNuuamd91jUV5c= github.com/twmb/go-cache v1.2.1 h1:yUkLutow4S2x5NMbqFW24o14OsucoFI5Fzmlb6uBinM= github.com/twmb/go-cache v1.2.1/go.mod h1:lArg9KhCl+GTFMikitLGhIBh/i11OK0lhSveqlMbbrY= github.com/ultraware/funlen v0.1.0 h1:BuqclbkY6pO+cvxoq7OsktIXZpgBSkYTQtmwhAK81vI= diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index fcc60e5c1..500060fb2 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -52,6 +52,7 @@ import ( conn_standalone "github.com/conduitio/conduit/pkg/plugin/connector/standalone" proc_plugin "github.com/conduitio/conduit/pkg/plugin/processor" proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin" + "github.com/conduitio/conduit/pkg/plugin/processor/procutils" proc_standalone "github.com/conduitio/conduit/pkg/plugin/processor/standalone" "github.com/conduitio/conduit/pkg/processor" "github.com/conduitio/conduit/pkg/provisioning" @@ -96,8 +97,9 @@ type Runtime struct { connectorPluginService *conn_plugin.PluginService processorPluginService *proc_plugin.PluginService - schemaService *connutils.SchemaService + connSchemaService *connutils.SchemaService connectorPersister *connector.Persister + procSchemaService *procutils.SchemaService logger log.CtxLogger gRPCStatsHandler *promgrpc.StatsHandler @@ -164,7 +166,13 @@ func NewRuntime(cfg Config) (*Runtime, error) { // Create all necessary internal services func createServices(r *Runtime) error { - standaloneReg, err := proc_standalone.NewRegistry(r.logger, r.Config.Processors.Path) + schemaRegistry, err := createSchemaRegistry(r.Config, r.logger, r.DB) + if err != nil { + return cerrors.Errorf("failed to create schema registry: %w", err) + } + + procSchemaService := procutils.NewSchemaService(r.logger, schemaRegistry) + standaloneReg, err := proc_standalone.NewRegistry(r.logger, r.Config.Processors.Path, procSchemaService) if err != nil { return cerrors.Errorf("failed creating processor registry: %w", err) } @@ -175,19 +183,15 @@ func createServices(r *Runtime) error { standaloneReg, ) - schemaRegistry, err := createSchemaRegistry(r.Config, r.logger, r.DB) - if err != nil { - return cerrors.Errorf("failed to create schema registry: %w", err) - } tokenService := connutils.NewAuthManager() - schemaService := connutils.NewSchemaService(r.logger, schemaRegistry, tokenService) + connSchemaService := connutils.NewSchemaService(r.logger, schemaRegistry, tokenService) connPluginService := conn_plugin.NewPluginService( r.logger, conn_builtin.NewRegistry( r.logger, r.Config.ConnectorPlugins, - schemaService, + connSchemaService, ), conn_standalone.NewRegistry(r.logger, r.Config.Connectors.Path), tokenService, @@ -210,7 +214,8 @@ func createServices(r *Runtime) error { r.processorService = procService r.connectorPluginService = connPluginService r.processorPluginService = procPluginService - r.schemaService = schemaService + r.connSchemaService = connSchemaService + r.procSchemaService = procSchemaService return nil } @@ -480,7 +485,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad grpc.StatsHandler(r.gRPCStatsHandler), ) - schemaServiceAPI := pconnutils.NewSchemaServiceServer(r.schemaService) + schemaServiceAPI := pconnutils.NewSchemaServiceServer(r.connSchemaService) connutilsv1.RegisterSchemaServiceServer(grpcServer, schemaServiceAPI) // Makes it easier to use command line tools to interact @@ -491,7 +496,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad // Names taken from schema.proto healthServer := api.NewHealthServer( map[string]api.Checker{ - "SchemaService": r.schemaService, + "SchemaService": r.connSchemaService, }, r.logger, ) diff --git a/pkg/plugin/connector/connutils/schema.go b/pkg/plugin/connector/connutils/schema.go index 3771d3985..9a56cab76 100644 --- a/pkg/plugin/connector/connutils/schema.go +++ b/pkg/plugin/connector/connutils/schema.go @@ -28,8 +28,8 @@ import ( ) type SchemaService struct { - registry schemaregistry.Registry - tokenService *AuthManager + registry schemaregistry.Registry + authManager *AuthManager logger log.CtxLogger } @@ -39,12 +39,12 @@ var _ pconnutils.SchemaService = (*SchemaService)(nil) func NewSchemaService( logger log.CtxLogger, registry schemaregistry.Registry, - tokenService *AuthManager, + authManager *AuthManager, ) *SchemaService { return &SchemaService{ - registry: registry, - logger: logger.WithComponent("connutils.SchemaService"), - tokenService: tokenService, + registry: registry, + logger: logger.WithComponent("connutils.SchemaService"), + authManager: authManager, } } @@ -57,7 +57,7 @@ func (s *SchemaService) Check(ctx context.Context) error { } func (s *SchemaService) CreateSchema(ctx context.Context, req pconnutils.CreateSchemaRequest) (pconnutils.CreateSchemaResponse, error) { - err := s.tokenService.IsTokenValid(pconnutils.ConnectorTokenFromContext(ctx)) + err := s.authManager.IsTokenValid(pconnutils.ConnectorTokenFromContext(ctx)) if err != nil { return pconnutils.CreateSchemaResponse{}, err } @@ -79,7 +79,7 @@ func (s *SchemaService) CreateSchema(ctx context.Context, req pconnutils.CreateS } func (s *SchemaService) GetSchema(ctx context.Context, req pconnutils.GetSchemaRequest) (pconnutils.GetSchemaResponse, error) { - err := s.tokenService.IsTokenValid(pconnutils.ConnectorTokenFromContext(ctx)) + err := s.authManager.IsTokenValid(pconnutils.ConnectorTokenFromContext(ctx)) if err != nil { return pconnutils.GetSchemaResponse{}, err } @@ -100,8 +100,7 @@ func (s *SchemaService) GetSchema(ctx context.Context, req pconnutils.GetSchemaR func unwrapSrError(e *sr.ResponseError) error { switch e.ErrorCode { - case conduitschemaregistry.ErrorCodeSubjectNotFound, - conduitschemaregistry.ErrorCodeSchemaNotFound: + case conduitschemaregistry.ErrorCodeSubjectNotFound: return pconnutils.ErrSubjectNotFound case conduitschemaregistry.ErrorCodeVersionNotFound: return pconnutils.ErrVersionNotFound diff --git a/pkg/plugin/processor/procutils/schema.go b/pkg/plugin/processor/procutils/schema.go new file mode 100644 index 000000000..98aa9e99e --- /dev/null +++ b/pkg/plugin/processor/procutils/schema.go @@ -0,0 +1,95 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package procutils + +import ( + "context" + + "github.com/conduitio/conduit-processor-sdk/pprocutils" + conduitschemaregistry "github.com/conduitio/conduit-schema-registry" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/schemaregistry" + "github.com/conduitio/conduit/pkg/schemaregistry/fromschema" + "github.com/conduitio/conduit/pkg/schemaregistry/toschema" + "github.com/twmb/franz-go/pkg/sr" +) + +type SchemaService struct { + registry schemaregistry.Registry + logger log.CtxLogger +} + +var _ pprocutils.SchemaService = (*SchemaService)(nil) + +func NewSchemaService(logger log.CtxLogger, registry schemaregistry.Registry) *SchemaService { + return &SchemaService{ + registry: registry, + logger: logger.WithComponent("procutils.SchemaService"), + } +} + +func (s *SchemaService) Check(ctx context.Context) error { + r, ok := s.registry.(schemaregistry.RegistryWithCheck) + if !ok { + return nil + } + return r.Check(ctx) +} + +func (s *SchemaService) CreateSchema(ctx context.Context, req pprocutils.CreateSchemaRequest) (pprocutils.CreateSchemaResponse, error) { + ss, err := s.registry.CreateSchema(ctx, req.Subject, sr.Schema{ + Schema: string(req.Bytes), + Type: fromschema.SrSchemaType(req.Type), + }) + if err != nil { + var respErr *sr.ResponseError + if cerrors.As(err, &respErr) { + return pprocutils.CreateSchemaResponse{}, unwrapSrError(respErr) + } + return pprocutils.CreateSchemaResponse{}, cerrors.Errorf("failed to create schema: %w", err) + } + return pprocutils.CreateSchemaResponse{ + Schema: toschema.SrSubjectSchema(ss), + }, nil +} + +func (s *SchemaService) GetSchema(ctx context.Context, req pprocutils.GetSchemaRequest) (pprocutils.GetSchemaResponse, error) { + ss, err := s.registry.SchemaBySubjectVersion(ctx, req.Subject, req.Version) + if err != nil { + var respErr *sr.ResponseError + if cerrors.As(err, &respErr) { + return pprocutils.GetSchemaResponse{}, unwrapSrError(respErr) + } + return pprocutils.GetSchemaResponse{}, cerrors.Errorf("failed to get schema by subject and version: %w", err) + } + + return pprocutils.GetSchemaResponse{ + Schema: toschema.SrSubjectSchema(ss), + }, nil +} + +func unwrapSrError(e *sr.ResponseError) error { + switch e.ErrorCode { + case conduitschemaregistry.ErrorCodeSubjectNotFound: + return pprocutils.ErrSubjectNotFound + case conduitschemaregistry.ErrorCodeVersionNotFound: + return pprocutils.ErrVersionNotFound + case conduitschemaregistry.ErrorCodeInvalidSchema: + return pprocutils.ErrInvalidSchema + default: + return e + } +} diff --git a/pkg/plugin/processor/standalone/host_module.go b/pkg/plugin/processor/standalone/host_module.go index b0e1abcc8..caa21a9b1 100644 --- a/pkg/plugin/processor/standalone/host_module.go +++ b/pkg/plugin/processor/standalone/host_module.go @@ -15,10 +15,14 @@ package standalone import ( + "bytes" "context" + "github.com/conduitio/conduit-processor-sdk/pprocutils" + "github.com/conduitio/conduit-processor-sdk/pprocutils/v1/fromproto" + "github.com/conduitio/conduit-processor-sdk/pprocutils/v1/toproto" processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" - "github.com/conduitio/conduit-processor-sdk/wasm" + procutilsv1 "github.com/conduitio/conduit-processor-sdk/proto/procutils/v1" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/stealthrocket/wazergo" @@ -31,6 +35,8 @@ import ( var hostModule wazergo.HostModule[*hostModuleInstance] = hostModuleFunctions{ "command_request": wazergo.F1((*hostModuleInstance).commandRequest), "command_response": wazergo.F1((*hostModuleInstance).commandResponse), + "create_schema": wazergo.F1((*hostModuleInstance).createSchema), + "get_schema": wazergo.F1((*hostModuleInstance).getSchema), } // hostModuleFunctions type implements HostModule, providing the module name, @@ -51,7 +57,10 @@ func (f hostModuleFunctions) Functions() wazergo.Functions[*hostModuleInstance] // Instantiate creates a new instance of the module. This is called by the // runtime when a new instance of the module is created. func (f hostModuleFunctions) Instantiate(_ context.Context, opts ...hostModuleOption) (*hostModuleInstance, error) { - mod := &hostModuleInstance{} + mod := &hostModuleInstance{ + parkedResponses: make(map[string]proto.Message), + lastRequestBytes: make(map[string][]byte), + } wazergo.Configure(mod, opts...) if mod.commandRequests == nil { return nil, cerrors.New("missing command requests channel") @@ -68,11 +77,13 @@ func hostModuleOptions( logger log.CtxLogger, requests <-chan *processorv1.CommandRequest, responses chan<- tuple[*processorv1.CommandResponse, error], + schemaService pprocutils.SchemaService, ) hostModuleOption { return wazergo.OptionFunc(func(m *hostModuleInstance) { m.logger = logger m.commandRequests = requests m.commandResponses = responses + m.schemaService = schemaService }) } @@ -81,8 +92,11 @@ type hostModuleInstance struct { logger log.CtxLogger commandRequests <-chan *processorv1.CommandRequest commandResponses chan<- tuple[*processorv1.CommandResponse, error] + schemaService pprocutils.SchemaService parkedCommandRequest *processorv1.CommandRequest + parkedResponses map[string]proto.Message + lastRequestBytes map[string][]byte } func (*hostModuleInstance) Close(context.Context) error { return nil } @@ -101,7 +115,7 @@ func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes var ok bool m.parkedCommandRequest, ok = <-m.commandRequests if !ok { - return wasm.ErrorCodeNoMoreCommands + return pprocutils.ErrorCodeNoMoreCommands } } @@ -121,7 +135,7 @@ func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes out, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], m.parkedCommandRequest) if err != nil { m.logger.Err(ctx, err).Msg("failed marshalling protobuf command request") - return wasm.ErrorCodeUnknownCommandRequest + return pprocutils.ErrorCodeUnknownCommandRequest } m.parkedCommandRequest = nil @@ -139,9 +153,112 @@ func (m *hostModuleInstance) commandResponse(ctx context.Context, buf types.Byte if err != nil { m.logger.Err(ctx, err).Msg("failed unmarshalling protobuf command response") m.commandResponses <- tuple[*processorv1.CommandResponse, error]{nil, err} - return wasm.ErrorCodeUnknownCommandResponse + return pprocutils.ErrorCodeUnknownCommandResponse } m.commandResponses <- tuple[*processorv1.CommandResponse, error]{&resp, nil} return 0 } + +// handleWasmRequest is a helper function that handles WASM requests. It +// unmarshalls the request, calls the service function, and marshals the response. +// If the buffer is too small, it parks the response and returns the size of the +// response. The next call to this method will return the same response. +func (m *hostModuleInstance) handleWasmRequest( + ctx context.Context, + buf types.Bytes, + serviceMethod string, + serviceFunc func(ctx context.Context, buf types.Bytes) (proto.Message, error), +) types.Uint32 { + lastRequestBytes := m.lastRequestBytes[serviceMethod] + parkedResponse := m.parkedResponses[serviceMethod] + + // no request/response parked or the buffer contains a new request + if parkedResponse == nil || + (len(buf) >= len(lastRequestBytes) && + !bytes.Equal(lastRequestBytes, buf[:len(lastRequestBytes)])) { + resp, err := serviceFunc(ctx, buf) + if err != nil { + m.logger.Err(ctx, err). + Str("method", serviceMethod). + Msg("failed executing service method") + var pErr *pprocutils.Error + if cerrors.As(err, &pErr) { + return types.Uint32(pErr.ErrCode) + } + return pprocutils.ErrorCodeInternal + } + lastRequestBytes = buf + parkedResponse = resp + + m.lastRequestBytes[serviceMethod] = lastRequestBytes + m.parkedResponses[serviceMethod] = parkedResponse + } + + // If the buffer is too small, we park the response and return the size of the + // response. The next call to this method will return the same response. + if size := proto.Size(parkedResponse); len(buf) < size { + m.logger.Warn(ctx). + Int("response_bytes", size). + Int("allocated_bytes", len(buf)). + Msgf("insufficient memory, response will be parked until next call to %s", serviceMethod) + return types.Uint32(size) + } + + out, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], parkedResponse) + if err != nil { + m.logger.Err(ctx, err).Msg("failed marshalling protobuf create schema response") + return pprocutils.ErrorCodeInternal + } + + m.lastRequestBytes[serviceMethod] = nil + m.parkedResponses[serviceMethod] = nil + + return types.Uint32(len(out)) +} + +// createSchema is the exported function that is called by the WASM module to +// create a schema and set the buffer bytes to the marshalled response. +// It returns the response size on success, or an error code on error. +func (m *hostModuleInstance) createSchema(ctx context.Context, buf types.Bytes) types.Uint32 { + m.logger.Trace(ctx).Msg("executing create_schema") + + serviceFunc := func(ctx context.Context, buf types.Bytes) (proto.Message, error) { + var req procutilsv1.CreateSchemaRequest + err := proto.Unmarshal(buf, &req) + if err != nil { + m.logger.Err(ctx, err).Msg("failed unmarshalling protobuf create schema request") + return nil, err + } + schemaResp, err := m.schemaService.CreateSchema(ctx, fromproto.CreateSchemaRequest(&req)) + if err != nil { + return nil, err + } + return toproto.CreateSchemaResponse(schemaResp), nil + } + + return m.handleWasmRequest(ctx, buf, "create_schema", serviceFunc) +} + +// getSchema is the exported function that is called by the WASM module to +// get a schema and set the buffer bytes to the marshalled response. +// It returns the response size on success, or an error code on error. +func (m *hostModuleInstance) getSchema(ctx context.Context, buf types.Bytes) types.Uint32 { + m.logger.Trace(ctx).Msg("executing get_schema") + + serviceFunc := func(ctx context.Context, buf types.Bytes) (proto.Message, error) { + var req procutilsv1.GetSchemaRequest + err := proto.Unmarshal(buf, &req) + if err != nil { + m.logger.Err(ctx, err).Msg("failed unmarshalling protobuf get schema request") + return nil, err + } + schemaResp, err := m.schemaService.GetSchema(ctx, fromproto.GetSchemaRequest(&req)) + if err != nil { + return nil, err + } + return toproto.GetSchemaResponse(schemaResp), nil + } + + return m.handleWasmRequest(ctx, buf, "get_schema", serviceFunc) +} diff --git a/pkg/plugin/processor/standalone/processor.go b/pkg/plugin/processor/standalone/processor.go index 6b59e7f62..aab7e8ef2 100644 --- a/pkg/plugin/processor/standalone/processor.go +++ b/pkg/plugin/processor/standalone/processor.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit-processor-sdk/pprocutils" processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" @@ -77,6 +78,7 @@ func newWASMProcessor( runtime wazero.Runtime, processorModule wazero.CompiledModule, hostModule *wazergo.CompiledModule[*hostModuleInstance], + schemaService pprocutils.SchemaService, id string, logger log.CtxLogger, @@ -97,6 +99,7 @@ func newWASMProcessor( logger, commandRequests, commandResponses, + schemaService, ), ) if err != nil { diff --git a/pkg/plugin/processor/standalone/processor_test.go b/pkg/plugin/processor/standalone/processor_test.go index fba6149bd..c135e196b 100644 --- a/pkg/plugin/processor/standalone/processor_test.go +++ b/pkg/plugin/processor/standalone/processor_test.go @@ -20,7 +20,8 @@ import ( "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" - "github.com/conduitio/conduit-processor-sdk/wasm" + "github.com/conduitio/conduit-processor-sdk/pprocutils" + "github.com/conduitio/conduit-processor-sdk/schema" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" @@ -32,7 +33,7 @@ func TestWASMProcessor_Specification_Success(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) gotSpec, err := underTest.Specification() @@ -49,11 +50,11 @@ func TestWASMProcessor_Specification_Error(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, SpecifyErrorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, SpecifyErrorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) _, err = underTest.Specification() - is.Equal(err, wasm.NewError(0, "boom")) + is.Equal(err, pprocutils.NewError(0, "boom")) // Teardown still works is.NoErr(underTest.Teardown(ctx)) @@ -64,7 +65,7 @@ func TestWASMProcessor_Configure_Success(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) err = underTest.Configure(ctx, nil) @@ -78,11 +79,11 @@ func TestWASMProcessor_Configure_Error(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) err = underTest.Configure(ctx, map[string]string{"configure": "error"}) - is.Equal(err, wasm.NewError(0, "boom")) + is.Equal(err, pprocutils.NewError(0, "boom")) // Teardown still works is.NoErr(underTest.Teardown(ctx)) @@ -93,7 +94,7 @@ func TestWASMProcessor_Configure_Panic(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) err = underTest.Configure(ctx, map[string]string{"configure": "panic"}) @@ -109,7 +110,7 @@ func TestWASMProcessor_Open_Success(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) err = underTest.Open(ctx) @@ -123,14 +124,14 @@ func TestWASMProcessor_Open_Error(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) err = underTest.Configure(ctx, map[string]string{"open": "error"}) is.NoErr(err) err = underTest.Open(ctx) - is.Equal(err, wasm.NewError(0, "boom")) + is.Equal(err, pprocutils.NewError(0, "boom")) // Teardown still works is.NoErr(underTest.Teardown(ctx)) @@ -141,7 +142,7 @@ func TestWASMProcessor_Open_Panic(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) err = underTest.Configure(ctx, map[string]string{"open": "panic"}) @@ -160,7 +161,7 @@ func TestWASMProcessor_Process_Success(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) is.NoErr(underTest.Configure(ctx, map[string]string{"process.prefix": "hello!\n\n"})) @@ -197,7 +198,7 @@ func TestWASMProcessor_Process_Error(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) is.NoErr(underTest.Configure(ctx, map[string]string{"process": "error"})) @@ -207,7 +208,7 @@ func TestWASMProcessor_Process_Error(t *testing.T) { errRecord, ok := processed[0].(sdk.ErrorRecord) is.True(ok) - is.Equal(errRecord.Error, wasm.NewError(0, "boom")) + is.Equal(errRecord.Error, pprocutils.NewError(0, "boom")) // Teardown still works is.NoErr(underTest.Teardown(ctx)) @@ -218,7 +219,7 @@ func TestWASMProcessor_Process_Panic(t *testing.T) { ctx := context.Background() logger := log.Test(t) - underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) is.NoErr(err) is.NoErr(underTest.Configure(ctx, map[string]string{"process": "panic"})) @@ -234,3 +235,18 @@ func TestWASMProcessor_Process_Panic(t *testing.T) { err = underTest.Teardown(ctx) is.True(cerrors.Is(err, plugin.ErrPluginNotRunning)) } + +func TestWASMProcessor_Configure_Schema_Success(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, schema.NewInMemoryService(), "test-processor", logger) + is.NoErr(err) + + err = underTest.Configure(ctx, map[string]string{"configure": "create_and_get_schema"}) + is.NoErr(err) + + // Teardown still works + is.NoErr(underTest.Teardown(ctx)) +} diff --git a/pkg/plugin/processor/standalone/proto.go b/pkg/plugin/processor/standalone/proto.go index 4ffceabee..9a02259ea 100644 --- a/pkg/plugin/processor/standalone/proto.go +++ b/pkg/plugin/processor/standalone/proto.go @@ -21,8 +21,8 @@ import ( "github.com/conduitio/conduit-commons/opencdc" opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit-processor-sdk/pprocutils" processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" - "github.com/conduitio/conduit-processor-sdk/wasm" ) // protoConverter converts between the SDK and protobuf types. @@ -122,5 +122,5 @@ func (c protoConverter) errorRecord(in *processorv1.Process_ProcessedRecord_Erro } func (c protoConverter) error(e *processorv1.Error) error { - return wasm.NewError(e.Code, e.Message) + return pprocutils.NewError(e.Code, e.Message) } diff --git a/pkg/plugin/processor/standalone/registry.go b/pkg/plugin/processor/standalone/registry.go index 039cc9d2d..e00ba0f39 100644 --- a/pkg/plugin/processor/standalone/registry.go +++ b/pkg/plugin/processor/standalone/registry.go @@ -23,6 +23,7 @@ import ( "sync" sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit-processor-sdk/pprocutils" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" @@ -45,6 +46,8 @@ type Registry struct { // multiple times, once for each WASM module. hostModule *wazergo.CompiledModule[*hostModuleInstance] + schemaService pprocutils.SchemaService + // plugins stores plugin blueprints in a 2D map, first key is the plugin // name, the second key is the plugin version plugins map[string]map[string]blueprint @@ -61,7 +64,7 @@ type blueprint struct { // ensure someone can't switch the plugin after we registered it } -func NewRegistry(logger log.CtxLogger, pluginDir string) (*Registry, error) { +func NewRegistry(logger log.CtxLogger, pluginDir string, schemaService pprocutils.SchemaService) (*Registry, error) { // context is only used for logging, it's not used for long running operations ctx := context.Background() @@ -95,10 +98,11 @@ func NewRegistry(logger log.CtxLogger, pluginDir string) (*Registry, error) { } r := &Registry{ - logger: logger, - runtime: runtime, - hostModule: compiledHostModule, - pluginDir: pluginDir, + logger: logger, + runtime: runtime, + hostModule: compiledHostModule, + pluginDir: pluginDir, + schemaService: schemaService, } r.reloadPlugins() @@ -127,7 +131,7 @@ func (r *Registry) NewProcessor(ctx context.Context, fullName plugin.FullName, i return nil, cerrors.Errorf("could not find standalone processor plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound) } - p, err := newWASMProcessor(ctx, r.runtime, bp.module, r.hostModule, id, r.logger) + p, err := newWASMProcessor(ctx, r.runtime, bp.module, r.hostModule, r.schemaService, id, r.logger) if err != nil { return nil, cerrors.Errorf("failed to create a new WASM processor: %w", err) } @@ -249,7 +253,7 @@ func (r *Registry) loadBlueprint(ctx context.Context, path string) (blueprint, e } }() - p, err := newWASMProcessor(ctx, r.runtime, module, r.hostModule, "init-processor", log.Nop()) + p, err := newWASMProcessor(ctx, r.runtime, module, r.hostModule, r.schemaService, "init-processor", log.Nop()) if err != nil { return blueprint{}, fmt.Errorf("failed to create a new WASM processor: %w", err) } diff --git a/pkg/plugin/processor/standalone/registry_test.go b/pkg/plugin/processor/standalone/registry_test.go index f0aad2556..d505c7e13 100644 --- a/pkg/plugin/processor/standalone/registry_test.go +++ b/pkg/plugin/processor/standalone/registry_test.go @@ -23,6 +23,7 @@ import ( "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit-processor-sdk/schema" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" @@ -33,7 +34,7 @@ import ( func TestRegistry_List(t *testing.T) { is := is.New(t) - underTest, err := NewRegistry(log.Test(t), testPluginChaosDir) + underTest, err := NewRegistry(log.Test(t), testPluginChaosDir, schema.NewInMemoryService()) is.NoErr(err) list := underTest.List() is.Equal(1, len(list)) @@ -48,7 +49,7 @@ func TestRegistry_List(t *testing.T) { func TestRegistry_MalformedProcessor(t *testing.T) { is := is.New(t) - underTest, err := NewRegistry(log.Test(t), testPluginMalformedDir) + underTest, err := NewRegistry(log.Test(t), testPluginMalformedDir, schema.NewInMemoryService()) is.NoErr(err) list := underTest.List() is.Equal(0, len(list)) @@ -57,7 +58,7 @@ func TestRegistry_MalformedProcessor(t *testing.T) { func TestRegistry_SpecifyError(t *testing.T) { is := is.New(t) - underTest, err := NewRegistry(log.Test(t), testPluginSpecifyErrorDir) + underTest, err := NewRegistry(log.Test(t), testPluginSpecifyErrorDir, schema.NewInMemoryService()) is.NoErr(err) list := underTest.List() is.Equal(0, len(list)) @@ -68,7 +69,7 @@ func TestRegistry_ChaosProcessor(t *testing.T) { is := is.New(t) // reuse this registry for multiple tests, because it's expensive to create - underTest, err := NewRegistry(log.Nop(), testPluginChaosDir) + underTest, err := NewRegistry(log.Nop(), testPluginChaosDir, schema.NewInMemoryService()) is.NoErr(err) const standaloneProcessorName = plugin.FullName("standalone:chaos-processor@v1.3.5") diff --git a/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go b/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go index fae071ff2..d50850d2a 100644 --- a/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go +++ b/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go @@ -17,12 +17,16 @@ package main import ( + "bytes" "context" "errors" + "fmt" "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit-commons/schema" sdk "github.com/conduitio/conduit-processor-sdk" + procschema "github.com/conduitio/conduit-processor-sdk/schema" ) func main() { @@ -66,36 +70,18 @@ func (p *chaosProcessor) Specification() (sdk.Specification, error) { }, nil } -func (p *chaosProcessor) Configure(_ context.Context, cfg map[string]string) error { +func (p *chaosProcessor) Configure(ctx context.Context, cfg map[string]string) error { p.cfg = cfg - err := p.methodBehavior("configure") - if err != nil { - return err - } - - return nil -} - -func (p *chaosProcessor) Open(context.Context) error { - return p.methodBehavior("open") + return p.methodBehavior(ctx, "configure") } -func (p *chaosProcessor) methodBehavior(name string) error { - switch p.cfg[name] { - case "error": - return errors.New("boom") - case "panic": - panic(name + " panic") - case "", "success": - return nil - default: - panic("unknown mode: " + p.cfg[name]) - } +func (p *chaosProcessor) Open(ctx context.Context) error { + return p.methodBehavior(ctx, "open") } -func (p *chaosProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord { - err := p.methodBehavior("process") +func (p *chaosProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord { + err := p.methodBehavior(ctx, "process") if err != nil { // on error we return a single record with the error return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} @@ -117,6 +103,62 @@ func (p *chaosProcessor) Process(_ context.Context, records []opencdc.Record) [] return out } -func (p *chaosProcessor) Teardown(context.Context) error { - return p.methodBehavior("teardown") +func (p *chaosProcessor) Teardown(ctx context.Context) error { + return p.methodBehavior(ctx, "teardown") +} + +func (p *chaosProcessor) methodBehavior(ctx context.Context, name string) error { + switch p.cfg[name] { + case "error": + return errors.New("boom") + case "panic": + panic(name + " panic") + case "create_and_get_schema": + want := schema.Schema{ + Subject: "chaosProcessor", + Version: 1, + Type: schema.TypeAvro, + Bytes: []byte("int"), + } + + sch1, err := procschema.Create(ctx, want.Type, want.Subject, want.Bytes) + if err != nil { + return fmt.Errorf("failed to create schema: %w", err) + } + switch { + case sch1.ID == 0: + return fmt.Errorf("id is 0") + case sch1.Subject != want.Subject: + return fmt.Errorf("subjects do not match: %v != %v", sch1.Subject, want.Subject) + case sch1.Version != want.Version: + return fmt.Errorf("versions do not match: %v != %v", sch1.Version, want.Version) + case sch1.Type != want.Type: + return fmt.Errorf("types do not match: %v != %v", sch1.Type, want.Type) + case !bytes.Equal(sch1.Bytes, want.Bytes): + return fmt.Errorf("schemas do not match: %s != %s", sch1.Bytes, want.Bytes) + } + + sch2, err := procschema.Get(ctx, sch1.Subject, sch1.Version) + if err != nil { + return fmt.Errorf("failed to get schema: %w", err) + } + switch { + case sch1.ID != sch2.ID: + return fmt.Errorf("ids do not match: %v != %v", sch1.ID, sch2.ID) + case sch1.Subject != sch2.Subject: + return fmt.Errorf("subjects do not match: %v != %v", sch1.Subject, sch2.Subject) + case sch1.Version != sch2.Version: + return fmt.Errorf("versions do not match: %v != %v", sch1.Version, sch2.Version) + case sch1.Type != sch2.Type: + return fmt.Errorf("types do not match: %v != %v", sch1.Type, sch2.Type) + case !bytes.Equal(sch1.Bytes, sch2.Bytes): + return fmt.Errorf("schemas do not match: %s != %s", sch1.Bytes, sch2.Bytes) + } + + return nil + case "", "success": + return nil + default: + panic("unknown mode: " + p.cfg[name]) + } } From 2a99a9fc051f2ee612fd14cb2fbd867b0cda2ab9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 1 Aug 2024 09:58:55 +0000 Subject: [PATCH 2/3] go.mod: bump github.com/twmb/franz-go/pkg/sr from 1.0.0 to 1.0.1 (#1738) Bumps [github.com/twmb/franz-go/pkg/sr](https://github.com/twmb/franz-go) from 1.0.0 to 1.0.1. - [Changelog](https://github.com/twmb/franz-go/blob/master/CHANGELOG.md) - [Commits](https://github.com/twmb/franz-go/compare/v1.0.0...pkg/sr/v1.0.1) --- updated-dependencies: - dependency-name: github.com/twmb/franz-go/pkg/sr dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 71ff568a6..de54111db 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/rs/zerolog v1.33.0 github.com/stealthrocket/wazergo v0.19.1 github.com/tetratelabs/wazero v1.7.3 - github.com/twmb/franz-go/pkg/sr v1.0.0 + github.com/twmb/franz-go/pkg/sr v1.0.1 github.com/twmb/go-cache v1.2.1 go.uber.org/mock v0.4.0 golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 diff --git a/go.sum b/go.sum index 776e51c5d..a51972319 100644 --- a/go.sum +++ b/go.sum @@ -849,8 +849,8 @@ github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8X github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= -github.com/twmb/franz-go/pkg/sr v1.0.0 h1:4FUatTSTEuG2xievT0iDrgnpErgRg7kFLNioJYqfrqs= -github.com/twmb/franz-go/pkg/sr v1.0.0/go.mod h1:aUFRRLI5WYKpKzmWDztzZFecx5eOkCNuuamd91jUV5c= +github.com/twmb/franz-go/pkg/sr v1.0.1 h1:hf3eRFDUWSfmR7JQCS/3JiqZEQwqbiDSS/DooewMHCE= +github.com/twmb/franz-go/pkg/sr v1.0.1/go.mod h1:aUFRRLI5WYKpKzmWDztzZFecx5eOkCNuuamd91jUV5c= github.com/twmb/go-cache v1.2.1 h1:yUkLutow4S2x5NMbqFW24o14OsucoFI5Fzmlb6uBinM= github.com/twmb/go-cache v1.2.1/go.mod h1:lArg9KhCl+GTFMikitLGhIBh/i11OK0lhSveqlMbbrY= github.com/ultraware/funlen v0.1.0 h1:BuqclbkY6pO+cvxoq7OsktIXZpgBSkYTQtmwhAK81vI= From ab0ecac44926ce880d4587f00afb380cf9346042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Thu, 1 Aug 2024 13:45:26 +0200 Subject: [PATCH 3/3] Mention storage options in README (#1730) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Mention storage options in README * linter --------- Co-authored-by: Raúl Barroso --- README.md | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 00edaa393..002827b74 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ Conduit was created and open-sourced by [Meroxa](https://meroxa.io). - [Quick start](#quick-start) - [Installation guide](#installation-guide) - [Configuring Conduit](#configuring-conduit) +- [Storage](#storage) - [Connectors](#connectors) - [Processors](#processors) - [API](#api) @@ -189,6 +190,34 @@ each configuration option based on the following priorities: connection-string: postgres://localhost:5432/conduitdb # -db.postgres.connection-string or CONDUIT_DB_POSTGRES_CONNECTION_STRING ``` +## Storage + +Conduit's own data (information about pipelines, connectors, etc.) can be stored +in the following databases: + +- BadgerDB (default) +- PostgreSQL +- SQLite + +It's also possible to store all the data in memory, which is sometimes useful +for development purposes. + +The database type used can be configured with the `db.type` parameter (through +any of the [configuration](#configuring-conduit) options in Conduit). +For example, the CLI flag to use a PostgresSQL database with Conduit is as +follows: `-db.type=postgres`. + +Changing database parameters (e.g. the PostgreSQL connection string) is done +through parameters of the following form: `db..`. For +example, the CLI flag to use a PostgreSQL instance listening on `localhost:5432` +would be: `-db.postgres.connection-string=postgres://localhost:5432/conduitdb`. + +The full example in our case would be: + +```shell +./conduit -db.type=postgres -db.postgres.connection-string="postgresql://localhost:5432/conduitdb" +``` + ## Connectors For the full list of available connectors, see @@ -215,7 +244,9 @@ Conduit ships with a number of built-in connectors: source/destination for AWS S3. - [Generator connector](https://github.com/ConduitIO/conduit-connector-generator) provides a source which generates random data (useful for testing). - +- [Log connector](https://github.com/ConduitIO/conduit-connector-log) + provides a destination which logs all records (useful for testing). + Additionally, we have prepared a [Kafka Connect wrapper](https://github.com/conduitio/conduit-kafka-connect-wrapper) that allows you to run any Apache Kafka Connect connector as part of a Conduit