Skip to content

Commit

Permalink
refactor fetching and pushing of last offset (#1329)
Browse files Browse the repository at this point in the history
- Moves getting of last offset to start flow and removes from peer flow
  • Loading branch information
Amogh-Bharadwaj authored Feb 19, 2024
1 parent f6a7372 commit 91da856
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 88 deletions.
43 changes: 16 additions & 27 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,6 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
return nil
}

// GetLastSyncedID implements GetLastSyncedID.
func (a *FlowableActivity) GetLastSyncedID(
ctx context.Context,
config *protos.GetLastSyncedIDInput,
) (*protos.LastSyncState, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

var lastOffset int64
lastOffset, err = dstConn.GetLastOffset(config.FlowJobName)
if err != nil {
return nil, err
}
return &protos.LastSyncState{Checkpoint: lastOffset}, nil
}

// EnsurePullability implements EnsurePullability.
func (a *FlowableActivity) EnsurePullability(
ctx context.Context,
Expand Down Expand Up @@ -218,12 +198,21 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
defer shutdown()

slog.InfoContext(ctx, "getting last offset from destination peer")
// Get the last offset from the destination
lastOffset, getLastOffsetErr := dstConn.GetLastOffset(input.FlowConnectionConfigs.FlowJobName)
if getLastOffsetErr != nil {
return nil, getLastOffsetErr
}

msg := fmt.Sprintf("last synced ID from destination peer - %d\n", lastOffset)
slog.InfoContext(ctx, msg)
// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
Expand All @@ -233,7 +222,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
FlowJobName: flowName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
LastOffset: lastOffset,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.FlowConnectionConfigs.IdleTimeoutSeconds),
Expand Down Expand Up @@ -394,7 +383,7 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(dstConn)

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
})
defer shutdown()
Expand Down Expand Up @@ -463,7 +452,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
})
defer shutdown()
Expand Down Expand Up @@ -601,7 +590,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
}

shutdown := utils.HeartbeatRoutine(ctx, 1*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()
Expand Down Expand Up @@ -644,7 +633,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
return err
}

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
})
defer shutdown()
Expand Down Expand Up @@ -953,7 +942,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return nil
})

shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "syncing xmin."
})
defer shutdown()
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
stream *model.QRecordStream,
flowName string,
) (int, error) {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute,
shutdown := utils.HeartbeatRoutine(s.connector.ctx,
func() string {
return fmt.Sprintf("writing to avro stage for objectFolder %s and staging table %s",
objectFolder, stagingTable)
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *EventHubConnector) Close() error {
allErrors = errors.Join(allErrors, err)
}

err = c.hubManager.Close(context.Background())
err = c.hubManager.Close(c.ctx)
if err != nil {
c.logger.Error("failed to close event hub manager", slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (c *EventHubConnector) processBatch(
lastUpdatedOffset := int64(0)

numRecords := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(c.ctx, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords.Load(), flowJobName,
Expand Down Expand Up @@ -201,7 +201,7 @@ func (c *EventHubConnector) processBatch(
}

curNumRecords := numRecords.Load()
if curNumRecords%1000 == 0 {
if curNumRecords%10000 == 0 {
c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords)))
}

Expand Down
27 changes: 19 additions & 8 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE

var hubConnectOK bool
var hub any
hub, hubConnectOK = m.hubs.Load(name)
hub, hubConnectOK = m.hubs.Load(name.Eventhub)
if hubConnectOK {
hubTmp := hub.(*azeventhubs.ProducerClient)
_, err := hubTmp.GetEventHubProperties(ctx, nil)
Expand All @@ -94,7 +95,7 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
if closeError != nil {
slog.Error("failed to close producer client", slog.Any("error", closeError))
}
m.hubs.Delete(name)
m.hubs.Delete(name.Eventhub)
hubConnectOK = false
}
}
Expand All @@ -111,7 +112,7 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
if err != nil {
return nil, fmt.Errorf("failed to create eventhub client: %v", err)
}
m.hubs.Store(name, hub)
m.hubs.Store(name.Eventhub, hub)
return hub, nil
}

Expand All @@ -127,18 +128,28 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu

func (m *EventHubManager) Close(ctx context.Context) error {
var allErrors error

numHubsClosed := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load())
})
defer shutdown()
m.hubs.Range(func(key any, value any) bool {
name := key.(ScopedEventhub)
hub := value.(*azeventhubs.ProducerClient)
err := m.closeProducerClient(ctx, hub)
slog.InfoContext(ctx, "closing eventhub client",
slog.Uint64("numClosed", uint64(numHubsClosed.Load())),
slog.String("Currently closing", fmt.Sprintf("%v", key)))
client := value.(*azeventhubs.ProducerClient)
err := m.closeProducerClient(ctx, client)
if err != nil {
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", key), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}

numHubsClosed.Add(1)
return true
})

slog.InfoContext(ctx, "closed all eventhub clients", slog.Any("numClosed", numHubsClosed.Load()))

return allErrors
}

Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(p.ctx, func() string {
jobName := p.flowJobName
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
Expand Down Expand Up @@ -344,8 +344,6 @@ func (p *PostgresCDCSource) consumeStream(
}
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()

utils.RecordHeartbeatWithRecover(p.ctx, "consumeStream ReceiveMessage")
ctxErr := p.ctx.Err()
if ctxErr != nil {
return fmt.Errorf("consumeStream preempted: %w", ctxErr)
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (c *PostgresConnector) GetTableSchema(
return nil, err
}
res[tableName] = tableSchema
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
utils.RecordHeartbeat(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
c.logger.Info(fmt.Sprintf("fetched schema for table %s", tableName))
}

Expand Down Expand Up @@ -695,7 +695,7 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab

tableExistsMapping[tableIdentifier] = false
c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier))
utils.RecordHeartbeat(c.ctx, fmt.Sprintf("created table %s", tableIdentifier))
}

err = createNormalizedTablesTx.Commit(c.ctx)
Expand Down Expand Up @@ -800,7 +800,7 @@ func (c *PostgresConnector) EnsurePullability(
},
},
}
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName))
utils.RecordHeartbeat(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName))
}

return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/geo"
Expand Down Expand Up @@ -84,7 +83,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc
q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName)

if !qe.testEnv {
shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(qe.ctx, func() string {
qe.logger.Info(fmt.Sprintf("still running '%s'...", q))
return fmt.Sprintf("running '%s'", q)
})
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage
activity.RecordHeartbeat(s.connector.ctx, "putting file to stage")
putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage)

shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})
defer shutdown()
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *SnowflakeConnector) GetTableSchema(
return nil, err
}
res[tableName] = tableSchema
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
utils.RecordHeartbeat(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
}

return &protos.GetTableSchemaBatchOutput{
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"log/slog"
"os"
"sync/atomic"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -131,7 +130,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
numRows := atomic.Uint32{}

if p.ctx != nil {
shutdown := utils.HeartbeatRoutine(p.ctx, 30*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(p.ctx, func() string {
written := numRows.Load()
return fmt.Sprintf("[avro] written %d rows to OCF", written)
})
Expand Down
21 changes: 9 additions & 12 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,31 @@ package utils
import (
"context"
"fmt"
"log/slog"
"time"

"go.temporal.io/sdk/activity"
)

func HeartbeatRoutine(
ctx context.Context,
interval time.Duration,
message func() string,
) func() {
shutdown := make(chan struct{})
go func() {
counter := 0
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
counter += 1
msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())
RecordHeartbeatWithRecover(ctx, msg)
RecordHeartbeat(ctx, msg)
select {
case <-shutdown:
return
case <-ctx.Done():
return
case <-time.After(interval):
case <-ticker.C:
}
}
}()
Expand All @@ -35,12 +36,8 @@ func HeartbeatRoutine(

// if the functions are being called outside the context of a Temporal workflow,
// activity.RecordHeartbeat panics, this is a bandaid for that.
func RecordHeartbeatWithRecover(ctx context.Context, details ...interface{}) {
defer func() {
if r := recover(); r != nil {
slog.Warn("ignoring panic from activity.RecordHeartbeat")
slog.Warn("this can happen when function is invoked outside of a Temporal workflow")
}
}()
activity.RecordHeartbeat(ctx, details...)
func RecordHeartbeat(ctx context.Context, details ...interface{}) {
if activity.IsActivity(ctx) {
activity.RecordHeartbeat(ctx, details...)
}
}
25 changes: 0 additions & 25 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,6 @@ func (s *SyncFlowExecution) executeSyncFlow(
) (*model.SyncResponse, error) {
s.logger.Info("executing sync flow - ", s.CDCFlowName)

syncMetaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
WaitForCancellation: true,
})

// execute GetLastSyncedID on destination peer
lastSyncInput := &protos.GetLastSyncedIDInput{
PeerConnectionConfig: config.Destination,
FlowJobName: s.CDCFlowName,
}

lastSyncFuture := workflow.ExecuteActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput)
var dstSyncState *protos.LastSyncState
if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil {
return nil, fmt.Errorf("failed to get last synced ID from destination peer: %w", err)
}

if dstSyncState != nil {
msg := fmt.Sprintf("last synced ID from destination peer - %d\n", dstSyncState.Checkpoint)
s.logger.Info(msg)
} else {
s.logger.Info("no last synced ID from destination peer")
}

startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 72 * time.Hour,
HeartbeatTimeout: 30 * time.Second,
Expand All @@ -72,7 +48,6 @@ func (s *SyncFlowExecution) executeSyncFlow(
// execute StartFlow on the peers to start the flow
startFlowInput := &protos.StartFlowInput{
FlowConnectionConfigs: config,
LastSyncState: dstSyncState,
SyncFlowOptions: opts,
RelationMessageMapping: relationMessageMapping,
}
Expand Down

0 comments on commit 91da856

Please sign in to comment.