diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 9d1b61224a..5714288c75 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -346,7 +346,10 @@ func PullCdcRecords[Items model.Items]( if err != nil { return err } - records.AddRecord(ctx, rec) + err = records.AddRecord(ctx, rec) + if err != nil { + return err + } if cdcRecordsStorage.Len() == 1 { records.SignalAsNotEmpty() diff --git a/flow/model/cdc_stream.go b/flow/model/cdc_stream.go index 34c91d3448..523a10709f 100644 --- a/flow/model/cdc_stream.go +++ b/flow/model/cdc_stream.go @@ -43,17 +43,20 @@ func (r *CDCStream[T]) GetLastCheckpoint() int64 { return r.lastCheckpointID.Load() } -func (r *CDCStream[T]) AddRecord(ctx context.Context, record Record[T]) { +func (r *CDCStream[T]) AddRecord(ctx context.Context, record Record[T]) error { logger := logger.LoggerFromCtx(ctx) + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { select { case r.records <- record: - return - case <-time.After(10 * time.Second): + return nil + case <-ticker.C: logger.Warn("waiting on adding record to stream", slog.Any("record", record)) case <-ctx.Done(): logger.Warn("context cancelled while adding record to stream", slog.Any("record", record)) - return + return ctx.Err() } } } diff --git a/flow/pua/stream_adapter.go b/flow/pua/stream_adapter.go index 4ea7548067..a03e68c919 100644 --- a/flow/pua/stream_adapter.go +++ b/flow/pua/stream_adapter.go @@ -42,6 +42,15 @@ func AttachToCdcStream( onErr context.CancelCauseFunc, ) *model.CDCStream[model.RecordItems] { outstream := model.NewCDCStream[model.RecordItems](0) + + handleErr := func(err error) { + onErr(err) + <-ctx.Done() + for range stream.GetRecords() { + // still read records to make sure input closes first + } + } + go func() { if stream.WaitAndCheckEmpty() { outstream.SignalAsEmpty() @@ -52,14 +61,14 @@ func AttachToCdcStream( ls.Push(lfn) ls.Push(LuaRecord.New(ls, record)) if err := ls.PCall(1, 0, nil); err != nil { - onErr(err) - <-ctx.Done() - for range stream.GetRecords() { - // still read records to make sure input closes first - } + handleErr(err) + break + } + err := outstream.AddRecord(ctx, record) + if err != nil { + handleErr(err) break } - outstream.AddRecord(ctx, record) } } outstream.SchemaDeltas = stream.SchemaDeltas