Skip to content

Commit

Permalink
Merge branch 'raul/update-architecture-doc' of github.com:ConduitIO/c…
Browse files Browse the repository at this point in the history
…onduit into raul/update-architecture-doc
  • Loading branch information
raulb committed Aug 1, 2024
2 parents c524614 + cdca34d commit 0560ccd
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 91 deletions.
33 changes: 32 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Conduit was created and open-sourced by [Meroxa](https://meroxa.io).
- [Quick start](#quick-start)
- [Installation guide](#installation-guide)
- [Configuring Conduit](#configuring-conduit)
- [Storage](#storage)
- [Connectors](#connectors)
- [Processors](#processors)
- [API](#api)
Expand Down Expand Up @@ -189,6 +190,34 @@ each configuration option based on the following priorities:
connection-string: postgres://localhost:5432/conduitdb # -db.postgres.connection-string or CONDUIT_DB_POSTGRES_CONNECTION_STRING
```
## Storage
Conduit's own data (information about pipelines, connectors, etc.) can be stored
in the following databases:
- BadgerDB (default)
- PostgreSQL
- SQLite
It's also possible to store all the data in memory, which is sometimes useful
for development purposes.
The database type used can be configured with the `db.type` parameter (through
any of the [configuration](#configuring-conduit) options in Conduit).
For example, the CLI flag to use a PostgresSQL database with Conduit is as
follows: `-db.type=postgres`.

Changing database parameters (e.g. the PostgreSQL connection string) is done
through parameters of the following form: `db.<db type>.<parameter name>`. For
example, the CLI flag to use a PostgreSQL instance listening on `localhost:5432`
would be: `-db.postgres.connection-string=postgres://localhost:5432/conduitdb`.

The full example in our case would be:

```shell
./conduit -db.type=postgres -db.postgres.connection-string="postgresql://localhost:5432/conduitdb"
```

## Connectors

For the full list of available connectors, see
Expand All @@ -215,7 +244,9 @@ Conduit ships with a number of built-in connectors:
source/destination for AWS S3.
- [Generator connector](https://github.com/ConduitIO/conduit-connector-generator)
provides a source which generates random data (useful for testing).

- [Log connector](https://github.com/ConduitIO/conduit-connector-log)
provides a destination which logs all records (useful for testing).

Additionally, we have prepared
a [Kafka Connect wrapper](https://github.com/conduitio/conduit-kafka-connect-wrapper)
that allows you to run any Apache Kafka Connect connector as part of a Conduit
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/NYTimes/gziphandler v1.1.1
github.com/bufbuild/buf v1.35.1
github.com/conduitio/conduit-commons v0.2.1-0.20240723194042-1db31d4f6d85
github.com/conduitio/conduit-commons v0.2.1-0.20240725195242-81748fb51964
github.com/conduitio/conduit-connector-file v0.6.1-0.20240709112929-6207f9f8efcf
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2
github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240731132439-34be835edabf
github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4
github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731123633-05f6d05cf002
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731175726-14885658b257
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71
github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588
github.com/conduitio/yaml/v3 v3.3.0
github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ github.com/ckaznocha/intrange v0.1.2/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8Vh
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/conduitio/conduit-commons v0.2.1-0.20240723194042-1db31d4f6d85 h1:rue6a80Vyk7n+wOzbz57kRf+vzN0NRJ5fxWB5aePesI=
github.com/conduitio/conduit-commons v0.2.1-0.20240723194042-1db31d4f6d85/go.mod h1:/RhBhPRgX0cbTA1KygZTQX0vK7d4LQeKQessNF79wYc=
github.com/conduitio/conduit-commons v0.2.1-0.20240725195242-81748fb51964 h1:IDtxmeMzxnWttV8lOGaG5c5w8Q4N6NyB6E46W5OQm4Y=
github.com/conduitio/conduit-commons v0.2.1-0.20240725195242-81748fb51964/go.mod h1:G07iD8aJRD3s1pvO+t8yv7/vMeKbMoSGFhZfemdLT8Y=
github.com/conduitio/conduit-connector-file v0.6.1-0.20240709112929-6207f9f8efcf h1:fw4Y61EzvRPJ4bqthhQ4IMnRtu2ZOF/B81yFfABPOl8=
github.com/conduitio/conduit-connector-file v0.6.1-0.20240709112929-6207f9f8efcf/go.mod h1:bCnmA+29l871cNhroZfiCS2O8+GhBNVECfL5DOof2ew=
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 h1:WMKvmvaE/E+03/0nz/2JpyelCd2nPtOTuBy3eyWcI58=
Expand All @@ -227,10 +227,10 @@ github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67a
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240730102156-29a2e67ad980/go.mod h1:GyI6kkdR55JGM/96v5OSI7vlVodur3L22SY+OJbPd0s=
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46 h1:tur/pSyX1RLzkxiBwhsV1qa6wP60pb20hJMptH5RRJY=
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46/go.mod h1:m+pf2cMF+qCwhMj9gUBV1BPGLPYauhtYkj2zFddfvdE=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731123633-05f6d05cf002 h1:blkPwC39ZnLxleOoP10NCrBa6rMxY637F+kGXuaUfkY=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731123633-05f6d05cf002/go.mod h1:FXwte9pgSeO0Fwr8FoFZgvuU3lvUeFUmoo1KswCl8KU=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd h1:R+tpcZKWOnr6LRsXr85C167SK9MhaLhYUEjBSUupU9Y=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd/go.mod h1:E9zqj0atY1+yBHWi4eZ3TagCZSBnFxBQBUcZktL6RFE=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731175726-14885658b257 h1:FhROctjgcia8NEYqhaztAPfS9YFk7DV/sBXim1K2HXg=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240731175726-14885658b257/go.mod h1:FXwte9pgSeO0Fwr8FoFZgvuU3lvUeFUmoo1KswCl8KU=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71 h1:TCHq3L/LS9Ngo2a4h39vSNpXXisTox1l5uXdwPxxtNM=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71/go.mod h1:n6VqVO07olTlvIUSHf2kZcU8cgu2jGmuO6bFrQST2v8=
github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588 h1:/OBjxI1JjE3AmifouogZ2KvlhGJ9tQGk4X7UxwjHo1o=
github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588/go.mod h1:G5t9W5Z5Mn0nW1TNnIQ1al4piqRXjc1R7HjdHgGFCx4=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
Expand Down
27 changes: 16 additions & 11 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
conn_standalone "github.com/conduitio/conduit/pkg/plugin/connector/standalone"
proc_plugin "github.com/conduitio/conduit/pkg/plugin/processor"
proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin"
"github.com/conduitio/conduit/pkg/plugin/processor/procutils"
proc_standalone "github.com/conduitio/conduit/pkg/plugin/processor/standalone"
"github.com/conduitio/conduit/pkg/processor"
"github.com/conduitio/conduit/pkg/provisioning"
Expand Down Expand Up @@ -96,8 +97,9 @@ type Runtime struct {
connectorPluginService *conn_plugin.PluginService
processorPluginService *proc_plugin.PluginService

schemaService *connutils.SchemaService
connSchemaService *connutils.SchemaService
connectorPersister *connector.Persister
procSchemaService *procutils.SchemaService

logger log.CtxLogger
gRPCStatsHandler *promgrpc.StatsHandler
Expand Down Expand Up @@ -164,7 +166,13 @@ func NewRuntime(cfg Config) (*Runtime, error) {

// Create all necessary internal services
func createServices(r *Runtime) error {
standaloneReg, err := proc_standalone.NewRegistry(r.logger, r.Config.Processors.Path)
schemaRegistry, err := createSchemaRegistry(r.Config, r.logger, r.DB)
if err != nil {
return cerrors.Errorf("failed to create schema registry: %w", err)
}

procSchemaService := procutils.NewSchemaService(r.logger, schemaRegistry)
standaloneReg, err := proc_standalone.NewRegistry(r.logger, r.Config.Processors.Path, procSchemaService)
if err != nil {
return cerrors.Errorf("failed creating processor registry: %w", err)
}
Expand All @@ -175,19 +183,15 @@ func createServices(r *Runtime) error {
standaloneReg,
)

schemaRegistry, err := createSchemaRegistry(r.Config, r.logger, r.DB)
if err != nil {
return cerrors.Errorf("failed to create schema registry: %w", err)
}
tokenService := connutils.NewAuthManager()
schemaService := connutils.NewSchemaService(r.logger, schemaRegistry, tokenService)
connSchemaService := connutils.NewSchemaService(r.logger, schemaRegistry, tokenService)

connPluginService := conn_plugin.NewPluginService(
r.logger,
conn_builtin.NewRegistry(
r.logger,
r.Config.ConnectorPlugins,
schemaService,
connSchemaService,
),
conn_standalone.NewRegistry(r.logger, r.Config.Connectors.Path),
tokenService,
Expand All @@ -210,7 +214,8 @@ func createServices(r *Runtime) error {
r.processorService = procService
r.connectorPluginService = connPluginService
r.processorPluginService = procPluginService
r.schemaService = schemaService
r.connSchemaService = connSchemaService
r.procSchemaService = procSchemaService

return nil
}
Expand Down Expand Up @@ -480,7 +485,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad
grpc.StatsHandler(r.gRPCStatsHandler),
)

schemaServiceAPI := pconnutils.NewSchemaServiceServer(r.schemaService)
schemaServiceAPI := pconnutils.NewSchemaServiceServer(r.connSchemaService)
connutilsv1.RegisterSchemaServiceServer(grpcServer, schemaServiceAPI)

// Makes it easier to use command line tools to interact
Expand All @@ -491,7 +496,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad
// Names taken from schema.proto
healthServer := api.NewHealthServer(
map[string]api.Checker{
"SchemaService": r.schemaService,
"SchemaService": r.connSchemaService,
},
r.logger,
)
Expand Down
19 changes: 9 additions & 10 deletions pkg/plugin/connector/connutils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

type SchemaService struct {
registry schemaregistry.Registry
tokenService *AuthManager
registry schemaregistry.Registry
authManager *AuthManager

logger log.CtxLogger
}
Expand All @@ -39,12 +39,12 @@ var _ pconnutils.SchemaService = (*SchemaService)(nil)
func NewSchemaService(
logger log.CtxLogger,
registry schemaregistry.Registry,
tokenService *AuthManager,
authManager *AuthManager,
) *SchemaService {
return &SchemaService{
registry: registry,
logger: logger.WithComponent("connutils.SchemaService"),
tokenService: tokenService,
registry: registry,
logger: logger.WithComponent("connutils.SchemaService"),
authManager: authManager,
}
}

Expand All @@ -57,7 +57,7 @@ func (s *SchemaService) Check(ctx context.Context) error {
}

func (s *SchemaService) CreateSchema(ctx context.Context, req pconnutils.CreateSchemaRequest) (pconnutils.CreateSchemaResponse, error) {
err := s.tokenService.IsTokenValid(pconnutils.ConnectorTokenFromContext(ctx))
err := s.authManager.IsTokenValid(pconnutils.ConnectorTokenFromContext(ctx))
if err != nil {
return pconnutils.CreateSchemaResponse{}, err
}
Expand All @@ -79,7 +79,7 @@ func (s *SchemaService) CreateSchema(ctx context.Context, req pconnutils.CreateS
}

func (s *SchemaService) GetSchema(ctx context.Context, req pconnutils.GetSchemaRequest) (pconnutils.GetSchemaResponse, error) {
err := s.tokenService.IsTokenValid(pconnutils.ConnectorTokenFromContext(ctx))
err := s.authManager.IsTokenValid(pconnutils.ConnectorTokenFromContext(ctx))
if err != nil {
return pconnutils.GetSchemaResponse{}, err
}
Expand All @@ -100,8 +100,7 @@ func (s *SchemaService) GetSchema(ctx context.Context, req pconnutils.GetSchemaR

func unwrapSrError(e *sr.ResponseError) error {
switch e.ErrorCode {
case conduitschemaregistry.ErrorCodeSubjectNotFound,
conduitschemaregistry.ErrorCodeSchemaNotFound:
case conduitschemaregistry.ErrorCodeSubjectNotFound:
return pconnutils.ErrSubjectNotFound
case conduitschemaregistry.ErrorCodeVersionNotFound:
return pconnutils.ErrVersionNotFound
Expand Down
95 changes: 95 additions & 0 deletions pkg/plugin/processor/procutils/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package procutils

import (
"context"

"github.com/conduitio/conduit-processor-sdk/pprocutils"
conduitschemaregistry "github.com/conduitio/conduit-schema-registry"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/schemaregistry"
"github.com/conduitio/conduit/pkg/schemaregistry/fromschema"
"github.com/conduitio/conduit/pkg/schemaregistry/toschema"
"github.com/twmb/franz-go/pkg/sr"
)

type SchemaService struct {
registry schemaregistry.Registry
logger log.CtxLogger
}

var _ pprocutils.SchemaService = (*SchemaService)(nil)

func NewSchemaService(logger log.CtxLogger, registry schemaregistry.Registry) *SchemaService {
return &SchemaService{
registry: registry,
logger: logger.WithComponent("procutils.SchemaService"),
}
}

func (s *SchemaService) Check(ctx context.Context) error {
r, ok := s.registry.(schemaregistry.RegistryWithCheck)
if !ok {
return nil
}
return r.Check(ctx)
}

func (s *SchemaService) CreateSchema(ctx context.Context, req pprocutils.CreateSchemaRequest) (pprocutils.CreateSchemaResponse, error) {
ss, err := s.registry.CreateSchema(ctx, req.Subject, sr.Schema{
Schema: string(req.Bytes),
Type: fromschema.SrSchemaType(req.Type),
})
if err != nil {
var respErr *sr.ResponseError
if cerrors.As(err, &respErr) {
return pprocutils.CreateSchemaResponse{}, unwrapSrError(respErr)
}
return pprocutils.CreateSchemaResponse{}, cerrors.Errorf("failed to create schema: %w", err)
}
return pprocutils.CreateSchemaResponse{
Schema: toschema.SrSubjectSchema(ss),
}, nil
}

func (s *SchemaService) GetSchema(ctx context.Context, req pprocutils.GetSchemaRequest) (pprocutils.GetSchemaResponse, error) {
ss, err := s.registry.SchemaBySubjectVersion(ctx, req.Subject, req.Version)
if err != nil {
var respErr *sr.ResponseError
if cerrors.As(err, &respErr) {
return pprocutils.GetSchemaResponse{}, unwrapSrError(respErr)
}
return pprocutils.GetSchemaResponse{}, cerrors.Errorf("failed to get schema by subject and version: %w", err)
}

return pprocutils.GetSchemaResponse{
Schema: toschema.SrSubjectSchema(ss),
}, nil
}

func unwrapSrError(e *sr.ResponseError) error {
switch e.ErrorCode {
case conduitschemaregistry.ErrorCodeSubjectNotFound:
return pprocutils.ErrSubjectNotFound
case conduitschemaregistry.ErrorCodeVersionNotFound:
return pprocutils.ErrVersionNotFound
case conduitschemaregistry.ErrorCodeInvalidSchema:
return pprocutils.ErrInvalidSchema
default:
return e
}
}
Loading

0 comments on commit 0560ccd

Please sign in to comment.