Skip to content

Commit

Permalink
prepend collection name to subject (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Aug 1, 2024
1 parent 1488565 commit 3065ee5
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 10 deletions.
35 changes: 27 additions & 8 deletions source_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c SourceWithSchemaExtractionConfig) parameters() config.Parameters {
configSourceSchemaExtractionPayloadSubject: {
Default: *c.PayloadSubject,
Type: config.ParameterTypeString,
Description: `The subject of the payload schema. Defaults to "payload".`,
Description: `The subject of the payload schema. If the record metadata contains the field "opencdc.collection" it is prepended to the subject name and separated with a dot.`,
},
configSourceSchemaExtractionKeyEnabled: {
Default: strconv.FormatBool(*c.KeyEnabled),
Expand All @@ -153,7 +153,7 @@ func (c SourceWithSchemaExtractionConfig) parameters() config.Parameters {
configSourceSchemaExtractionKeySubject: {
Default: *c.KeySubject,
Type: config.ParameterTypeString,
Description: `The subject of the key schema. Defaults to "key".`,
Description: `The subject of the key schema. If the record metadata contains the field "opencdc.collection" it is prepended to the subject name and separated with a dot.`,
},
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func (s *SourceWithSchemaExtraction) Wrap(impl Source) Source {
return &sourceWithSchemaExtraction{
Source: impl,
defaults: s.Config,
fingerprintCache: make(map[uint64]schema.Schema),
fingerprintCache: make(map[uint64]map[string]schema.Schema),
}
}

Expand All @@ -211,7 +211,7 @@ type sourceWithSchemaExtraction struct {
payloadSubject string
keySubject string

fingerprintCache map[uint64]schema.Schema
fingerprintCache map[uint64]map[string]schema.Schema
payloadWarnOnce sync.Once
keyWarnOnce sync.Once
}
Expand Down Expand Up @@ -292,7 +292,14 @@ func (s *sourceWithSchemaExtraction) encodeKey(ctx context.Context, rec *opencdc
return nil
}

sch, err := s.schemaForType(ctx, rec.Key, s.keySubject)
subject := s.keySubject
if rec.Metadata != nil {
if collection, err := rec.Metadata.GetCollection(); err == nil {
subject = collection + "." + subject
}
}

sch, err := s.schemaForType(ctx, rec.Key, subject)
if err != nil {
return fmt.Errorf("failed to extract schema for key: %w", err)
}
Expand Down Expand Up @@ -329,7 +336,14 @@ func (s *sourceWithSchemaExtraction) encodePayload(ctx context.Context, rec *ope
val = rec.Payload.Before
}

sch, err := s.schemaForType(ctx, val, s.payloadSubject)
subject := s.payloadSubject
if rec.Metadata != nil {
if collection, err := rec.Metadata.GetCollection(); err == nil {
subject = collection + "." + subject
}
}

sch, err := s.schemaForType(ctx, val, subject)
if err != nil {
return fmt.Errorf("failed to extract schema for payload: %w", err)
}
Expand Down Expand Up @@ -364,13 +378,18 @@ func (s *sourceWithSchemaExtraction) schemaForType(ctx context.Context, data any
schemaBytes := []byte(srd.String())
fp := rabin.Bytes(schemaBytes)

sch, ok := s.fingerprintCache[fp]
schemas, ok := s.fingerprintCache[fp]
if !ok {
schemas = make(map[string]schema.Schema)
s.fingerprintCache[fp] = schemas
}
sch, ok := schemas[subject]
if !ok {
sch, err = sdkschema.Create(ctx, s.schemaType, subject, schemaBytes)
if err != nil {
return schema.Schema{}, fmt.Errorf("failed to create schema: %w", err)
}
s.fingerprintCache[sch.Fingerprint()] = sch
schemas[subject] = sch
}

return sch, nil
Expand Down
48 changes: 46 additions & 2 deletions source_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
}

testCases := []struct {
name string
record opencdc.Record
name string
record opencdc.Record
wantKeySubject string
wantPayloadSubject string
}{{
name: "no key, no payload",
record: opencdc.Record{
Expand All @@ -207,6 +209,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "raw key",
record: opencdc.Record{
Expand All @@ -216,6 +220,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "structured key",
record: opencdc.Record{
Expand All @@ -225,6 +231,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "raw payload before",
record: opencdc.Record{
Expand All @@ -234,6 +242,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "structured payload before",
record: opencdc.Record{
Expand All @@ -242,6 +252,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
Before: testStructuredData.Clone(),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "raw payload after",
record: opencdc.Record{
Expand All @@ -251,6 +263,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: opencdc.RawData("this should not be encoded"),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "structured payload after",
record: opencdc.Record{
Expand All @@ -260,6 +274,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: testStructuredData.Clone(),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "all structured",
record: opencdc.Record{
Expand All @@ -269,6 +285,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: testStructuredData.Clone(),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "all raw",
record: opencdc.Record{
Expand All @@ -278,6 +296,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: opencdc.RawData("this should not be encoded"),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "key raw payload structured",
record: opencdc.Record{
Expand All @@ -287,6 +307,8 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: testStructuredData.Clone(),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "key structured payload raw",
record: opencdc.Record{
Expand All @@ -296,6 +318,22 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "all structured with collection",
record: opencdc.Record{
Metadata: map[string]string{
opencdc.MetadataCollection: "foo",
},
Key: testStructuredData.Clone(),
Payload: opencdc.Change{
Before: testStructuredData.Clone(),
After: testStructuredData.Clone(),
},
},
wantKeySubject: "foo.key",
wantPayloadSubject: "foo.payload",
}}

for _, tc := range testCases {
Expand Down Expand Up @@ -326,6 +364,9 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
version, err := got.Metadata.GetKeySchemaVersion()
is.NoErr(err)

is.Equal(subject, tc.wantKeySubject)
is.Equal(version, 1)

sch, err := sdkschema.Get(ctx, subject, version)
is.NoErr(err)

Expand All @@ -349,6 +390,9 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
version, err := got.Metadata.GetPayloadSchemaVersion()
is.NoErr(err)

is.Equal(subject, tc.wantPayloadSubject)
is.Equal(version, 1)

sch, err := sdkschema.Get(ctx, subject, version)
is.NoErr(err)

Expand Down

0 comments on commit 3065ee5

Please sign in to comment.