Skip to content

Commit

Permalink
cleanup, remove ]: from logs (#2176)
Browse files Browse the repository at this point in the history
split out from #2168
  • Loading branch information
serprex authored Oct 23, 2024
1 parent 7af2321 commit 724f1bc
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 44 deletions.
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func (a *FlowableActivity) RemoveTablesFromRawTable(
// we can ignore the error
return nil
}
return fmt.Errorf("[RemoveTablesFromRawTable]:failed to get destination connector: %w", err)
return fmt.Errorf("[RemoveTablesFromRawTable] failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

Expand Down
43 changes: 16 additions & 27 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ func (h *FlowRequestHandler) shutdownFlow(
slog.String("workflowId", workflowID),
)

err = h.handleCancelWorkflow(ctx, workflowID, "")
if err != nil {
if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil {
slog.Error("unable to cancel workflow", logs, slog.Any("error", err))
return fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err)
}
Expand Down Expand Up @@ -317,9 +316,7 @@ func (h *FlowRequestHandler) shutdownFlow(
DropFlowStats: deleteStats,
})
if err != nil {
slog.Error("unable to start DropFlow workflow",
logs,
slog.Any("error", err))
slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err))
return fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

Expand All @@ -334,10 +331,7 @@ func (h *FlowRequestHandler) shutdownFlow(
select {
case err := <-errChan:
if err != nil {
slog.Error("DropFlow workflow did not execute successfully",
logs,
slog.Any("error", err),
)
slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err))
return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(5 * time.Minute):
Expand All @@ -349,10 +343,7 @@ func (h *FlowRequestHandler) shutdownFlow(
}

if err := h.removeFlowEntryInCatalog(ctx, flowJobName); err != nil {
slog.Error("unable to remove flow job entry",
slog.String(string(shared.FlowNameKey), flowJobName),
slog.Any("error", err),
slog.String("workflowId", workflowID))
slog.Error("unable to remove flow job entry", logs, slog.Any("error", err))
return err
}

Expand All @@ -363,36 +354,37 @@ func (h *FlowRequestHandler) FlowStateChange(
ctx context.Context,
req *protos.FlowStateChangeRequest,
) (*protos.FlowStateChangeResponse, error) {
slog.Info("FlowStateChange called", slog.String("flowJobName", req.FlowJobName), slog.Any("req", req))
logs := slog.String("flowJobName", req.FlowJobName)
slog.Info("FlowStateChange called", logs, slog.Any("req", req))
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
slog.Error("[flow-state-change]unable to get workflowID", slog.Any("error", err))
slog.Error("[flow-state-change] unable to get workflowID", logs, slog.Any("error", err))
return nil, err
}
currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
slog.Error("[flow-state-change]unable to get workflow status", slog.Any("error", err))
slog.Error("[flow-state-change] unable to get workflow status", logs, slog.Any("error", err))
return nil, err
}

if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil {
err = model.CDCDynamicPropertiesSignal.SignalClientWorkflow(
err := model.CDCDynamicPropertiesSignal.SignalClientWorkflow(
ctx,
h.temporalClient,
workflowID,
"",
req.FlowConfigUpdate.GetCdcFlowConfigUpdate(),
)
if err != nil {
slog.Error("unable to signal workflow", slog.Any("error", err))
slog.Error("unable to signal workflow", logs, slog.Any("error", err))
return nil, fmt.Errorf("unable to signal workflow: %w", err)
}
}

if req.RequestedFlowState != protos.FlowStatus_STATUS_UNKNOWN {
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
currState == protos.FlowStatus_STATUS_RUNNING {
slog.Info("[flow-state-change]: received pause request")
slog.Info("[flow-state-change] received pause request", logs)
err = model.FlowSignal.SignalClientWorkflow(
ctx,
h.temporalClient,
Expand All @@ -402,7 +394,7 @@ func (h *FlowRequestHandler) FlowStateChange(
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
slog.Info("[flow-state-change]: received resume request")
slog.Info("[flow-state-change] received resume request", logs)
err = model.FlowSignal.SignalClientWorkflow(
ctx,
h.temporalClient,
Expand All @@ -412,16 +404,16 @@ func (h *FlowRequestHandler) FlowStateChange(
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState != protos.FlowStatus_STATUS_TERMINATED) {
slog.Info("[flow-state-change]: received drop mirror request")
slog.Info("[flow-state-change] received drop mirror request", logs)
err = h.shutdownFlow(ctx, req.FlowJobName, req.DropMirrorStats)
} else if req.RequestedFlowState != currState {
slog.Error("illegal state change requested", slog.Any("requestedFlowState", req.RequestedFlowState),
slog.Error("illegal state change requested", logs, slog.Any("requestedFlowState", req.RequestedFlowState),
slog.Any("currState", currState))
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
slog.Error("unable to signal workflow", slog.Any("error", err))
slog.Error("unable to signal workflow", logs, slog.Any("error", err))
return nil, fmt.Errorf("unable to signal workflow: %w", err)
}
}
Expand All @@ -432,11 +424,9 @@ func (h *FlowRequestHandler) FlowStateChange(
func (h *FlowRequestHandler) handleCancelWorkflow(ctx context.Context, workflowID, runID string) error {
errChan := make(chan error, 1)

// Create a new context with timeout for CancelWorkflow
ctxWithTimeout, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

// Call CancelWorkflow in a goroutine
go func() {
err := h.temporalClient.CancelWorkflow(ctxWithTimeout, workflowID, runID)
errChan <- err
Expand All @@ -452,8 +442,7 @@ func (h *FlowRequestHandler) handleCancelWorkflow(ctx context.Context, workflowI
}
}
case <-time.After(1 * time.Minute):
// If 1 minute has passed and we haven't received an error, terminate the workflow
slog.Error("Timeout reached while trying to cancel PeerFlow workflow. Attempting to terminate.")
slog.Error("Timeout reached while trying to cancel PeerFlow workflow. Attempting to terminate.", slog.String("workflowId", workflowID))
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions flow/cmd/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (h *FlowRequestHandler) GetDynamicSettings(
) (*protos.GetDynamicSettingsResponse, error) {
rows, err := h.pool.Query(ctx, "select config_name,config_value from dynamic_settings")
if err != nil {
slog.Error("[GetDynamicConfigs]: failed to query settings", slog.Any("error", err))
slog.Error("[GetDynamicConfigs] failed to query settings", slog.Any("error", err))
return nil, err
}
settings := slices.Clone(peerdbenv.DynamicSettings[:])
Expand All @@ -33,7 +33,7 @@ func (h *FlowRequestHandler) GetDynamicSettings(
}
return nil
}); err != nil {
slog.Error("[GetDynamicConfigs]: failed to collect rows", slog.Any("error", err))
slog.Error("[GetDynamicConfigs] failed to collect rows", slog.Any("error", err))
return nil, err
}

Expand All @@ -58,7 +58,7 @@ func (h *FlowRequestHandler) PostDynamicSetting(
_, err := h.pool.Exec(ctx, `insert into dynamic_settings (config_name, config_value) values ($1, $2)
on conflict (config_name) do update set config_value = $2`, req.Name, req.Value)
if err != nil {
slog.Error("[PostDynamicConfig]: failed to execute update setting", slog.Any("error", err))
slog.Error("[PostDynamicConfig] failed to execute update setting", slog.Any("error", err))
return nil, err
}
return &protos.PostDynamicSettingResponse{}, nil
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (c *PostgresConnector) createSlotAndPublication(
parsedSrcTableName, err := utils.ParseSchemaTable(srcTableName)
if err != nil {
signal.SlotCreated <- SlotCreationResult{
Err: fmt.Errorf("[publication-creation]:source table identifier %s is invalid", srcTableName),
Err: fmt.Errorf("[publication-creation] source table identifier %s is invalid", srcTableName),
}
return
}
Expand Down
3 changes: 1 addition & 2 deletions flow/workflows/drop_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error
StartToCloseTimeout: 5 * time.Minute,
})
ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
ctx = workflow.WithDataConverter(ctx,
converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter()))
ctx = workflow.WithDataConverter(ctx, converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter()))

dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/tables.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use client';
import { DropDialog } from '@/components/DropDialog';
import DropDialog from '@/components/DropDialog';
import MirrorLink from '@/components/MirrorLink';
import NewButton from '@/components/NewButton';
import PeerButton from '@/components/PeerComponent';
Expand Down
2 changes: 1 addition & 1 deletion ui/app/peers/peersTable.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use client';
import { DropDialog } from '@/components/DropDialog';
import DropDialog from '@/components/DropDialog';
import PeerButton from '@/components/PeerComponent';
import PeerTypeLabel, {
DBTypeToGoodText,
Expand Down
2 changes: 1 addition & 1 deletion ui/app/scripts/list.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use client';
import { DropDialog } from '@/components/DropDialog';
import DropDialog from '@/components/DropDialog';
import { Script } from '@/grpc_generated/route';
import { Button } from '@/lib/Button/Button';
import { Label } from '@/lib/Label/Label';
Expand Down
2 changes: 1 addition & 1 deletion ui/components/AlertDropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Button } from '@/lib/Button/Button';
import { Icon } from '@/lib/Icon';
import * as DropdownMenu from '@radix-ui/react-dropdown-menu';
import { useState } from 'react';
import { DropDialog } from './DropDialog';
import DropDialog from './DropDialog';
const AlertDropdown = ({
disable,
alertId,
Expand Down
12 changes: 6 additions & 6 deletions ui/components/DropDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ interface deleteScriptArgs {
scriptId: number;
}

export const handleDropMirror = async (
async function handleDropMirror(
dropArgs: dropMirrorArgs,
setLoading: Dispatch<SetStateAction<boolean>>,
setMsg: Dispatch<SetStateAction<string>>,
dropStats: boolean
) => {
) {
setLoading(true);
const res = await changeFlowState(
dropArgs.flowJobName,
Expand All @@ -50,15 +50,15 @@ export const handleDropMirror = async (
window.location.reload();

return true;
};
}

export const DropDialog = ({
export default function DropDialog({
mode,
dropArgs,
}: {
mode: 'PEER' | 'MIRROR' | 'ALERT' | 'SCRIPT';
dropArgs: dropMirrorArgs | dropPeerArgs | deleteAlertArgs | deleteScriptArgs;
}) => {
}) {
const [loading, setLoading] = useState(false);
const [msg, setMsg] = useState('');
const [dropStats, setDropStats] = useState(true);
Expand Down Expand Up @@ -209,4 +209,4 @@ export const DropDialog = ({
</div>
</Dialog>
);
};
}

0 comments on commit 724f1bc

Please sign in to comment.