Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 5, 2024
2 parents 8a4fe91 + 6e7a219 commit a851ee0
Show file tree
Hide file tree
Showing 45 changed files with 959 additions and 519 deletions.
31 changes: 31 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,3 +1065,34 @@ func (a *FlowableActivity) RemoveTablesFromCatalog(

return err
}

func (a *FlowableActivity) RemoveFlowEntryFromCatalog(ctx context.Context, flowName string) error {
logger := log.With(activity.GetLogger(ctx),
slog.String(string(shared.FlowNameKey), flowName))
tx, err := a.CatalogPool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction to remove flow entries from catalog: %w", err)
}
defer shared.RollbackTx(tx, slog.Default())

if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil {
return fmt.Errorf("unable to clear table_schema_mapping in catalog: %w", err)
}

ct, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName)
if err != nil {
return fmt.Errorf("unable to remove flow entry in catalog: %w", err)
}
if ct.RowsAffected() == 0 {
logger.Warn("flow entry not found in catalog, 0 records deleted")
} else {
logger.Info("flow entries removed from catalog",
slog.Int("rowsAffected", int(ct.RowsAffected())))
}

if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction to remove flow entries from catalog: %w", err)
}

return nil
}
12 changes: 10 additions & 2 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context,
var none TPull
logger := activity.GetLogger(ctx)
attempt := 0
waitInterval := time.Second
// try for 5 minutes, once per second
// after that, try indefinitely every minute
for {
a.CdcCacheRw.RLock()
entry, ok := a.CdcCache[sessionID]
Expand All @@ -63,7 +66,7 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context,
}
return none, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector)
}
activity.RecordHeartbeat(ctx, "wait another second for source connector")
activity.RecordHeartbeat(ctx, fmt.Sprintf("wait %s for source connector", waitInterval))
attempt += 1
if attempt > 2 {
logger.Info("waiting on source connector setup",
Expand All @@ -72,7 +75,12 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context,
if err := ctx.Err(); err != nil {
return none, err
}
time.Sleep(time.Second)
time.Sleep(waitInterval)
if attempt == 300 {
logger.Info("source connector not setup in time, transition to slow wait",
slog.String("sessionID", sessionID))
waitInterval = time.Minute
}
}
}

Expand Down
15 changes: 12 additions & 3 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"

"github.com/PeerDB-io/peer-flow/auth"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/middleware"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -213,14 +213,23 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
return fmt.Errorf("unable to create Temporal client: %w", err)
}

options, err := auth.AuthGrpcMiddleware([]string{
authGrpcMiddleware, err := middleware.AuthGrpcMiddleware([]string{
grpc_health_v1.Health_Check_FullMethodName,
grpc_health_v1.Health_Watch_FullMethodName,
})
if err != nil {
return err
}
grpcServer := grpc.NewServer(options...)

requestLoggingMiddleware := middleware.RequestLoggingMiddleWare()

// Interceptors are executed in the order they are passed to, so unauthorized requests are not logged
interceptors := grpc.ChainUnaryInterceptor(
authGrpcMiddleware,
requestLoggingMiddleware,
)

grpcServer := grpc.NewServer(interceptors)

catalogPool, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
Expand Down
103 changes: 37 additions & 66 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,31 +175,6 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
return shared.UpdateCDCConfigInCatalog(ctx, h.pool, slog.Default(), cfg)
}

func (h *FlowRequestHandler) removeFlowEntryInCatalog(
ctx context.Context,
flowName string,
) error {
tx, err := h.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("unable to begin tx to remove flow entry in catalog: %w", err)
}
defer shared.RollbackTx(tx, slog.Default())

if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil {
return fmt.Errorf("unable to clear table_schema_mapping to remove flow entry in catalog: %w", err)
}

if _, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName); err != nil {
return fmt.Errorf("unable to remove flow entry in catalog: %w", err)
}

if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("unable to commit remove flow entry in catalog: %w", err)
}

return nil
}

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
Expand Down Expand Up @@ -295,56 +270,52 @@ func (h *FlowRequestHandler) shutdownFlow(
if err != nil {
slog.Error("unable to check if workflow is cdc", logs, slog.Any("error", err))
return fmt.Errorf("unable to determine if workflow is cdc: %w", err)
} else if isCdc {
cdcConfig, err := h.getFlowConfigFromCatalog(ctx, flowJobName)
}
var cdcConfig *protos.FlowConnectionConfigs
if isCdc {
cdcConfig, err = h.getFlowConfigFromCatalog(ctx, flowJobName)
if err != nil {
slog.Error("unable to get cdc config from catalog", logs, slog.Any("error", err))
return fmt.Errorf("unable to get cdc config from catalog: %w", err)
}
workflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
TypedSearchAttributes: shared.NewSearchAttributes(flowJobName),
}
}
dropFlowWorkflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: dropFlowWorkflowID,
TaskQueue: h.peerflowTaskQueueID,
TypedSearchAttributes: shared.NewSearchAttributes(flowJobName),
}

dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions,
peerflow.DropFlowWorkflow, &protos.DropFlowInput{
FlowJobName: flowJobName,
SourcePeerName: cdcConfig.SourceName,
DestinationPeerName: cdcConfig.DestinationName,
DropFlowStats: deleteStats,
})
if err != nil {
slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err))
return fmt.Errorf("unable to start DropFlow workflow: %w", err)
}
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions,
peerflow.DropFlowWorkflow, &protos.DropFlowInput{
FlowJobName: flowJobName,
DropFlowStats: deleteStats,
FlowConnectionConfigs: cdcConfig,
})
if err != nil {
slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err))
return fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

errChan := make(chan error, 1)
go func() {
errChan <- dropFlowHandle.Get(cancelCtx, nil)
}()
errChan := make(chan error, 1)
go func() {
errChan <- dropFlowHandle.Get(cancelCtx, nil)
}()

select {
case err := <-errChan:
if err != nil {
slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err))
return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(5 * time.Minute):
if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil {
slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err))
return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
select {
case err := <-errChan:
if err != nil {
slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err))
return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(5 * time.Minute):
if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil {
slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err))
return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
}

if err := h.removeFlowEntryInCatalog(ctx, flowJobName); err != nil {
slog.Error("unable to remove flow job entry", logs, slog.Any("error", err))
return err
}

return nil
Expand Down
79 changes: 79 additions & 0 deletions flow/cmd/logged_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package cmd

import (
"context"

"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/workflow"
)

type LoggedWorkflowInboundInterceptor struct {
interceptor.WorkflowInboundInterceptorBase
Next interceptor.WorkflowInboundInterceptor
}

func NewLoggedWorkflowInboundInterceptor(next interceptor.WorkflowInboundInterceptor) *LoggedWorkflowInboundInterceptor {
return &LoggedWorkflowInboundInterceptor{
WorkflowInboundInterceptorBase: interceptor.WorkflowInboundInterceptorBase{Next: next},
Next: next,
}
}

func (w *LoggedWorkflowInboundInterceptor) ExecuteWorkflow(
ctx workflow.Context,
in *interceptor.ExecuteWorkflowInput,
) (interface{}, error) {
// Workflow starts here
result, err := w.Next.ExecuteWorkflow(ctx, in)
// Workflow ends here
return result, err
}

type LoggedActivityInboundInterceptor struct {
interceptor.ActivityInboundInterceptorBase
Next interceptor.ActivityInboundInterceptor
}

func NewLoggedActivityInboundInterceptor(next interceptor.ActivityInboundInterceptor) *LoggedActivityInboundInterceptor {
return &LoggedActivityInboundInterceptor{
ActivityInboundInterceptorBase: interceptor.ActivityInboundInterceptorBase{Next: next},
Next: next,
}
}

func (c *LoggedActivityInboundInterceptor) ExecuteActivity(
ctx context.Context,
in *interceptor.ExecuteActivityInput,
) (interface{}, error) {
// Activity starts here
out, err := c.Next.ExecuteActivity(ctx, in)
// Activity ends here
return out, err
}

type LoggedWorkerInterceptor struct {
interceptor.WorkerInterceptorBase
}

func (c LoggedWorkerInterceptor) InterceptActivity(
ctx context.Context,
next interceptor.ActivityInboundInterceptor,
) interceptor.ActivityInboundInterceptor {
return NewLoggedActivityInboundInterceptor(next)
}

func (c LoggedWorkerInterceptor) InterceptWorkflow(
ctx workflow.Context,
next interceptor.WorkflowInboundInterceptor,
) interceptor.WorkflowInboundInterceptor {
// Workflow intercepted here
intercepted := NewLoggedWorkflowInboundInterceptor(next)
// Workflow intercepting ends here
return intercepted
}

func NewLoggedWorkerInterceptor() *LoggedWorkerInterceptor {
return &LoggedWorkerInterceptor{
WorkerInterceptorBase: interceptor.WorkerInterceptorBase{},
}
}
Loading

0 comments on commit a851ee0

Please sign in to comment.