Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more information logs for UI logs table #2220

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@
) (*protos.SetupNormalizedTableBatchOutput, error) {
logger := activity.GetLogger(ctx)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables")
conn, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
Expand Down Expand Up @@ -247,6 +248,7 @@
})
defer shutdown()

a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have this twice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think it's meant to say conn.StartupSetupNormalizedTables has created tables, beginning ingestion. Wording needs to be updated

tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
for tableIdentifier, tableSchema := range tableNameSchemaMapping {
existing, err := conn.SetupNormalizedTable(
Expand All @@ -270,7 +272,7 @@
logger.Info("table already exists " + tableIdentifier)
}

}

Check failure on line 275 in flow/activities/flowable.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

if err := conn.FinishSetupNormalizedTables(ctx, tx); err != nil {
return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err)
Expand Down Expand Up @@ -513,7 +515,7 @@
}
}

a.Alerter.LogFlowInfo(ctx, config.FlowJobName, fmt.Sprintf("obtained partitions for table %s", config.WatermarkTable))

Check failure on line 518 in flow/activities/flowable.go

View workflow job for this annotation

GitHub Actions / lint

fmt.Sprintf can be replaced with string concatenation (perfsprint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just log instead of sending an alert? (too noisy)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not alerts. This is for our logs table in UI
I think LogFlowEvent is for alerts ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, but still do we want to do it for all tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah because we have scarcity of user facing info during initial load - common feedback from customers cc @iskakaushik


return &protos.QRepParitionResult{
Partitions: partitions,
Expand Down
4 changes: 3 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (a *SnapshotActivity) SetupReplication(
) (*protos.SetupReplicationOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Setting up replication slot and publication")
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for alerting, we need this


conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, a.CatalogPool, config.PeerName)
Expand Down Expand Up @@ -98,6 +98,8 @@ func (a *SnapshotActivity) SetupReplication(
connector: conn,
}

a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Replication slot and publication setup complete")

return &protos.SetupReplicationOutput{
SlotName: slotInfo.SlotName,
SnapshotName: slotInfo.SnapshotName,
Expand Down
Loading