diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 66040396ce..db04efea30 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -53,6 +53,9 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, var none TPull logger := activity.GetLogger(ctx) attempt := 0 + waitInterval := time.Second + // try for 5 minutes, once per second + // after that, try indefinitely every minute for { a.CdcCacheRw.RLock() entry, ok := a.CdcCache[sessionID] @@ -63,7 +66,7 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, } return none, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector) } - activity.RecordHeartbeat(ctx, "wait another second for source connector") + activity.RecordHeartbeat(ctx, fmt.Sprintf("wait %s for source connector", waitInterval)) attempt += 1 if attempt > 2 { logger.Info("waiting on source connector setup", @@ -72,7 +75,12 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, if err := ctx.Err(); err != nil { return none, err } - time.Sleep(time.Second) + time.Sleep(waitInterval) + if attempt == 300 { + logger.Info("source connector not setup in time, transition to slow wait", + slog.String("sessionID", sessionID)) + waitInterval = time.Minute + } } } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 91eaf3eba7..a355cfa00e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -40,8 +40,9 @@ type PostgresCDCSource struct { childToParentRelIDMapping map[uint32]uint32 // for storing schema delta audit logs to catalog - catalogPool *pgxpool.Pool - flowJobName string + catalogPool *pgxpool.Pool + hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{} + flowJobName string } type PostgresCDCConfig struct { @@ -59,18 +60,19 @@ type PostgresCDCConfig struct { // Create a new PostgresCDCSource func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { return &PostgresCDCSource{ - PostgresConnector: c, - srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, - tableNameMapping: cdcConfig.TableNameMapping, - tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping, - relationMessageMapping: cdcConfig.RelationMessageMapping, - slot: cdcConfig.Slot, - publication: cdcConfig.Publication, - childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, - typeMap: pgtype.NewMap(), - commitLock: nil, - catalogPool: cdcConfig.CatalogPool, - flowJobName: cdcConfig.FlowJobName, + PostgresConnector: c, + srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, + tableNameMapping: cdcConfig.TableNameMapping, + tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping, + relationMessageMapping: cdcConfig.RelationMessageMapping, + slot: cdcConfig.Slot, + publication: cdcConfig.Publication, + childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, + typeMap: pgtype.NewMap(), + commitLock: nil, + catalogPool: cdcConfig.CatalogPool, + flowJobName: cdcConfig.FlowJobName, + hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}), } } @@ -678,7 +680,10 @@ func processMessage[Items model.Items]( }, nil default: - logger.Debug(fmt.Sprintf("%T not supported", msg)) + if _, ok := p.hushWarnUnhandledMessageType[msg.Type()]; !ok { + logger.Warn(fmt.Sprintf("Unhandled message type: %T", msg)) + p.hushWarnUnhandledMessageType[msg.Type()] = struct{}{} + } } return nil, nil