diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 278f0ed863..3386680565 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -399,6 +399,10 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl // treat all relation messages as correponding to parent if partitioned. msg.RelationID = p.getParentRelIdIfPartitioned(msg.RelationID) + if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists { + return nil, nil + } + // TODO (kaushik): consider persistent state for a mirror job // to be stored somewhere in temporal state. We might need to persist // the state of the relation message somewhere