Skip to content

Commit

Permalink
review feedback pt.1
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jun 14, 2024
1 parent ec3ea8a commit a51daba
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 11 deletions.
5 changes: 4 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ func PullCdcRecords[Items model.Items](
if err != nil {
return err
}
records.AddRecord(ctx, rec)
err = records.AddRecord(ctx, rec)
if err != nil {
return err
}

if cdcRecordsStorage.Len() == 1 {
records.SignalAsNotEmpty()
Expand Down
11 changes: 7 additions & 4 deletions flow/model/cdc_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,20 @@ func (r *CDCStream[T]) GetLastCheckpoint() int64 {
return r.lastCheckpointID.Load()
}

func (r *CDCStream[T]) AddRecord(ctx context.Context, record Record[T]) {
func (r *CDCStream[T]) AddRecord(ctx context.Context, record Record[T]) error {
logger := logger.LoggerFromCtx(ctx)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case r.records <- record:
return
case <-time.After(10 * time.Second):
return nil
case <-ticker.C:
logger.Warn("waiting on adding record to stream", slog.Any("record", record))
case <-ctx.Done():
logger.Warn("context cancelled while adding record to stream", slog.Any("record", record))
return
return ctx.Err()
}
}
}
Expand Down
21 changes: 15 additions & 6 deletions flow/pua/stream_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ func AttachToCdcStream(
onErr context.CancelCauseFunc,
) *model.CDCStream[model.RecordItems] {
outstream := model.NewCDCStream[model.RecordItems](0)

handleErr := func(err error) {
onErr(err)
<-ctx.Done()
for range stream.GetRecords() {
// still read records to make sure input closes first
}
}

go func() {
if stream.WaitAndCheckEmpty() {
outstream.SignalAsEmpty()
Expand All @@ -52,14 +61,14 @@ func AttachToCdcStream(
ls.Push(lfn)
ls.Push(LuaRecord.New(ls, record))
if err := ls.PCall(1, 0, nil); err != nil {
onErr(err)
<-ctx.Done()
for range stream.GetRecords() {
// still read records to make sure input closes first
}
handleErr(err)
break
}
err := outstream.AddRecord(ctx, record)
if err != nil {
handleErr(err)
break
}
outstream.AddRecord(ctx, record)
}
}
outstream.SchemaDeltas = stream.SchemaDeltas
Expand Down

0 comments on commit a51daba

Please sign in to comment.