Skip to content

Commit

Permalink
reduce setup srcconn spam and hushWarn for message types (#2205)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 30, 2024
1 parent 37546bb commit 159d868
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
12 changes: 10 additions & 2 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context,
var none TPull
logger := activity.GetLogger(ctx)
attempt := 0
waitInterval := time.Second
// try for 5 minutes, once per second
// after that, try indefinitely every minute
for {
a.CdcCacheRw.RLock()
entry, ok := a.CdcCache[sessionID]
Expand All @@ -63,7 +66,7 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context,
}
return none, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector)
}
activity.RecordHeartbeat(ctx, "wait another second for source connector")
activity.RecordHeartbeat(ctx, fmt.Sprintf("wait %s for source connector", waitInterval))
attempt += 1
if attempt > 2 {
logger.Info("waiting on source connector setup",
Expand All @@ -72,7 +75,12 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context,
if err := ctx.Err(); err != nil {
return none, err
}
time.Sleep(time.Second)
time.Sleep(waitInterval)
if attempt == 300 {
logger.Info("source connector not setup in time, transition to slow wait",
slog.String("sessionID", sessionID))
waitInterval = time.Minute
}
}
}

Expand Down
35 changes: 20 additions & 15 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type PostgresCDCSource struct {
childToParentRelIDMapping map[uint32]uint32

// for storing schema delta audit logs to catalog
catalogPool *pgxpool.Pool
flowJobName string
catalogPool *pgxpool.Pool
hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{}
flowJobName string
}

type PostgresCDCConfig struct {
Expand All @@ -59,18 +60,19 @@ type PostgresCDCConfig struct {
// Create a new PostgresCDCSource
func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
return &PostgresCDCSource{
PostgresConnector: c,
srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
tableNameMapping: cdcConfig.TableNameMapping,
tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping,
relationMessageMapping: cdcConfig.RelationMessageMapping,
slot: cdcConfig.Slot,
publication: cdcConfig.Publication,
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
typeMap: pgtype.NewMap(),
commitLock: nil,
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
PostgresConnector: c,
srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
tableNameMapping: cdcConfig.TableNameMapping,
tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping,
relationMessageMapping: cdcConfig.RelationMessageMapping,
slot: cdcConfig.Slot,
publication: cdcConfig.Publication,
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
typeMap: pgtype.NewMap(),
commitLock: nil,
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}),
}
}

Expand Down Expand Up @@ -678,7 +680,10 @@ func processMessage[Items model.Items](
}, nil

default:
logger.Debug(fmt.Sprintf("%T not supported", msg))
if _, ok := p.hushWarnUnhandledMessageType[msg.Type()]; !ok {
logger.Warn(fmt.Sprintf("Unhandled message type: %T", msg))
p.hushWarnUnhandledMessageType[msg.Type()] = struct{}{}
}
}

return nil, nil
Expand Down

0 comments on commit 159d868

Please sign in to comment.